1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package mobvista.dmp.common
import mobvista.dmp.util.DateUtil
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 将天数据合并到安装列表全量数据中
*/
class InstallListLogic extends CommonInstallListOrc {
/**
* 解析天处理结果数据
*
* @param spark
* @param date
* @return
*/
def processDailyData(business: String, date: String, spark: SparkSession): DataFrame = {
val dateTime = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")
var sql = ""
business match {
case "3s" =>
spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _)
sql = MobvistaConstant.tracking_3s_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "adn_install" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.adn_install_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "adn_request_other" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.adn_request_other_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "adn_request_sdk" =>
spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _)
spark.udf.register("parseMExtData", MobvistaConstant.parseMExtData _)
sql = MobvistaConstant.adn_reuqest_sdk_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "adn_request_unmatch" =>
spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _)
spark.udf.register("parseMExtData", MobvistaConstant.parseMExtData _)
sql = MobvistaConstant.adn_reuqest_sdk_unmatch_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "adn_sdk" =>
spark.udf.register("toJsonString", MobvistaConstant.toJsonString _)
sql = MobvistaConstant.adn_sdk_sql.replace("@dt", date).replace("@version", "0")
.replace("@update_date", dateTime)
case "adn_sdk_v2" =>
spark.udf.register("toJsonString", MobvistaConstant.toJsonString _)
sql = MobvistaConstant.adn_sdk_sql.replace("@dt", date).replace("@version", "1")
.replace("@update_date", dateTime)
case "ali" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.ali_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "allpb" =>
spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplitV2 _)
sql = MobvistaConstant.allpb_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "bytedance" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.bytedance_sql.replace("@dt", dateTime)
.replace("@update_date", dateTime)
case "clever" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.clever_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "dsp_req" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
spark.udf.register("parseDSPExtData", MobvistaConstant.parseDSPExtData _)
sql = MobvistaConstant.dsp_req_sql.replace("@dt", dateTime)
.replace("@update_date", dateTime)
case "dsp_req_unmatch" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
spark.udf.register("parseDSPExtData", MobvistaConstant.parseDSPExtData _)
sql = MobvistaConstant.dsp_req_unmatch_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "facebook" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.facebook_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "ga" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.ga_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "joypacios" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.joypacios_sql.replace("@dt", dateTime)
.replace("@update_date", dateTime)
case "mp" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.mp_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "iqiyi_api" =>
spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
sql = MobvistaConstant.iqiyi_sql.replace("@dt", date)
.replace("@update_date", dateTime)
case "other" => // 因上游停止传数,故不再进行查询,造非法数据以便过滤
sql =
"""
|SELECT * FROM (SELECT '' device_id, '' device_type, '' platform, '' country, '' ext_data, '' install_list) t WHERE check_device(device_id)
|""".stripMargin
}
spark.sql(sql)
}
}
object InstallListLogic {
def main(args: Array[String]): Unit = {
new InstallListLogic().run(args)
}
}