1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package mobvista.dmp.datasource.retargeting
import java.util.Properties
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql._
object Constant {
val old2new_sql: String =
"""
|SELECT UPPER(CONCAT(tag_type, '-', first_tag, '-', second_tag)) tag_code, new_second_id FROM
| dwh.dm_old2new_tag WHERE tag_id != '' AND tag_id IS NOT NULL
""".stripMargin
val second2first_sql: String =
"""
|SELECT new_first_id, new_second_id FROM dwh.dm_old2new_tag
""".stripMargin
val id_old2new_sql: String =
"""
|SELECT new_second_id tag_code, tag_id FROM
| dwh.dm_old2new_tag WHERE tag_id != '' AND tag_id IS NOT NULL
""".stripMargin
val ods_dmp_user_info_all_sql: String =
"""
|SELECT dm_device.device_id, dm_device.platform, dm_device.country, CAST(COALESCE(dm_device.age,0) AS INT) age, CAST(COALESCE(dm_device.gender,0) AS INT) gender, dm_device.install, dm_device.interest, dm_active.frequency, dm_device.update_date
| FROM
| (SELECT UPPER(dev_id) device_id, MAX(platform) platform, MAX(country) country, MAX(age) age, MAX(gender) gender, getInstallList(CONCAT_WS(',',COLLECT_SET(install))) install, getInterestList(CONCAT_WS('#',COLLECT_SET(interest))) interest, MAX(update_date) update_date
| FROM dwh.ods_dmp_user_info_all WHERE dt = '@date' AND update_date >= '@update_date' AND checkDevice(dev_id)
| GROUP BY UPPER(dev_id)
| ) dm_device
| LEFT JOIN dm_active
| ON dm_device.device_id = dm_active.device_id
""".stripMargin
val ods_dmp_user_info_all_sql_distinct: String =
"""
|SELECT dm_device.device_id, dm_device.platform, dm_device.model, dm_device.osversion os_version, dm_device.country, CAST(COALESCE(dm_device.age,0) AS INT) age, CAST(COALESCE(dm_device.gender,0) AS INT) gender, dm_device.install, dm_device.interest,
| dm_device.behavior, dm_active.frequency, active_week.tags tag_week, active_month.tags tag_month, COALESCE(dm_region.region,ARRAY()) region, dm_device.update_date, dm_device.publish_date
| FROM
| (SELECT LOWER(dev_id) device_id, platform, model, osversion, CASE WHEN country = 'GB' THEN 'UK' ELSE country END AS country, age, gender, getInstallList(install) install, getInterestList(interest) interest, behavior, update_date, publish_date
| FROM dwh.ods_dmp_user_info_all_v2 WHERE dt = '@date' AND checkDevice(dev_id)
| ) dm_device
| LEFT JOIN dm_active ON dm_device.device_id = dm_active.device_id
| LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'week' GROUP BY lower(device_id)) active_week ON dm_device.device_id = active_week.device_id
| LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'month' GROUP BY lower(device_id)) active_month ON dm_device.device_id = active_month.device_id
| LEFT JOIN (SELECT LOWER(device_id) device_id, COLLECT_SET(region) region FROM dev.dm_device_region WHERE dt = '@date' AND region IN ('cn','virginia','tokyo') GROUP BY lower(device_id)) dm_region ON dm_device.device_id = dm_region.device_id
""".stripMargin
val ods_dmp_user_info_all_sql_distinct_region: String =
"""
|SELECT dm_device.device_id, dm_device.platform, device_region.region, dm_device.model, dm_device.osversion os_version, dm_device.country, CAST(COALESCE(dm_device.age,0) AS INT) age, CAST(COALESCE(dm_device.gender,0) AS INT) gender, dm_device.install, dm_device.interest,
| dm_device.behavior, dm_active.frequency, active_week.tags tag_week, active_month.tags tag_month, dm_device.update_date
| FROM
| (SELECT LOWER(dev_id) device_id, platform, model, osversion, CASE WHEN country = 'GB' THEN 'UK' ELSE country END AS country, age, gender, getInstallList(install) install, getInterestList(interest) interest, behavior, update_date
| FROM dwh.ods_dmp_user_info_all WHERE dt = '@date' AND update_date >= '@update_date' AND checkDevice(dev_id)
| ) dm_device
| LEFT JOIN dm_active ON dm_device.device_id = dm_active.device_id
| LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'week' GROUP BY lower(device_id)) active_week ON dm_device.device_id = active_week.device_id
| LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'month' GROUP BY lower(device_id)) active_month ON dm_device.device_id = active_month.device_id
| LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(region) region FROM dev.dm_device_region WHERE dt = '@date' AND last_req_day >= '@last_req_day' GROUP BY lower(device_id)) device_region ON dm_device.device_id = device_region.device_id
""".stripMargin
val statistics_sql: String =
"""
|SELECT LOWER(device_id) device_id,tags frequency
| FROM dwh.dmp_device_tag_statistics
| WHERE dt = '@date'
""".stripMargin
def checkDevice(device_id: String): Boolean = {
StringUtils.isNotBlank(device_id)
}
val package_sql: String =
"""
|SELECT package_name FROM dwh.dm_install_list_v2 WHERE dt = '@date' AND update_date >= '@update_date'
""".stripMargin
// import com.datastax.spark.connector._
// val deviceTagColumn = SomeColumns("device_id", "age", "gender", "install_apps", "interest", "frequency" overwrite)
def jdbcConnection(spark: SparkSession, database: String, table: String): DataFrame = {
val properties = new Properties()
properties.put("driver", "com.mysql.jdbc.Driver")
// properties.put("user", "root")
// properties.put("password", "19920627")
// properties.put("user", "apptag_rw")
// properties.put("password", "7gyLEVtkER3u8c9")
// 切换为 线上MySQL
properties.put("user", "mob_mfordmp")
properties.put("password", "3xqbE7MNX8WHf2H")
properties.put("characterEncoding", "utf8")
properties.put("zeroDateTimeBehavior", "convertToNull")
// val url = s"jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/${database}"
val url = s"jdbc:mysql://adn-mysql-internal.mobvista.com:3306/${database}"
spark.read.jdbc(url = url, table = table, properties = properties)
}
def writeMySQL(df: Dataset[Row], database: String, table: String, saveMode: SaveMode): Unit = {
val properties = new Properties()
properties.put("driver", "com.mysql.jdbc.Driver")
// properties.put("user", "root")
// properties.put("password", "19920627")
// properties.put("user", "apptag_rw")
// properties.put("password", "7gyLEVtkER3u8c9")
properties.put("user", "mob_mfordmp")
properties.put("password", "3xqbE7MNX8WHf2H")
properties.put("characterEncoding", "utf8")
properties.put("batchsize", "5000")
properties.put("innodb_lock_wait_timeout", "600")
// val url = s"jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/${database}"
val url = s"jdbc:mysql://adn-mysql-internal.mobvista.com:3306/${database}"
df.write.mode(saveMode).jdbc(url, table, properties)
}
def writeMySQLTest(df: Dataset[Row], database: String, table: String, saveMode: SaveMode): Unit = {
val properties = new Properties()
properties.put("driver", "com.mysql.jdbc.Driver")
properties.put("user", "root")
properties.put("password", "19920627")
properties.put("user", "apptag_rw")
properties.put("password", "7gyLEVtkER3u8c9")
val url = s"jdbc:mysql://localhost:3306/$database"
df.write.mode(saveMode).jdbc(url, table, properties)
}
// ('cn','virginia','tokyo')
val user_feature_sql: String =
s"""
|SELECT u.*
| FROM
| (SELECT LOWER(device_id) device_id, age, gender, interest, concat_ws(',',install) install_apps, frequency FROM dwh.dm_user_info a WHERE dt = '@date'
| AND update_date >= '@update_date' AND ((country != '' AND country IS NOT NULL) OR (age != '' AND age IS NOT NULL) OR (gender != '' AND gender IS NOT NULL) OR
| (size(interest) != 0 AND interest IS NOT NULL) OR (size(install) != 0 AND install IS NOT NULL))
| ) u
| LEFT SEMI JOIN
| (SELECT device_id FROM dm_region WHERE region = '@region') r
| ON u.device_id = r.device_id
""".stripMargin
val region_sql: String =
s"""
|SELECT LOWER(device_id) device_id, region FROM dev.dm_device_region WHERE dt = '@date' AND last_req_day >= '@last_req_day' AND region IN ('cn','virginia','tokyo')
| GROUP BY LOWER(device_id), region
""".stripMargin
}