1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package mobvista.dmp.datasource.appsflyer
import java.net.URI
import java.util.Properties
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
class AppsFlyerTotal extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("outputtotal", true, "[must] outputtotal")
options.addOption("dmpuserinfo",true ,"[must] dmpuserinfo")
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("today", true, "[must] today")
options.addOption("update_date", true, "[must] update_date")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"
val outputtotal = commandLine.getOptionValue("outputtotal")
val dmpuserinfo = commandLine.getOptionValue("dmpuserinfo")
val coalesce = commandLine.getOptionValue("coalesce")
val today = commandLine.getOptionValue("today")
val update_date = commandLine.getOptionValue("update_date")
val spark = SparkSession.builder()
.appName("AppsFlyerTotal")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(dmpuserinfo), true)
try {
val sqlContext = spark.sqlContext
val properties = new Properties()
properties.put("user", "adnro")
properties.put("password", "YcM123glh")
val url = "jdbc:mysql://adn-data-foronlinetest.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/mob_adn"
sqlContext.read.jdbc(url, "appsflyer_audience", properties).select("id", "platform", "dmp_package").toDF("id", "platform", "dmp_package").createOrReplaceTempView("appsflyer_audience")
val sqlAfOrgDaily=
s"""
|select t1.devid device_id,max(case when t2.platform='android' then 'gaid'
| when t2.platform='ios' then 'idfa' end ) device_type,
| max(t2.platform) platform,
| max(t2.dmp_package) package_names,
| '' category,
| t1.advertiser_id advertiser,
| '${update_date}' update_date
|from
|( select /*+ mapjoin(t2)*/ devid, container_id,advertiser_id from dwh.etl_appsflyer_audience_org where dt ='${today}' and devid rlike '${didPtn}' )
|t1 join appsflyer_audience t2 on (t1.container_id = t2.id)
|group by t1.devid,t1.advertiser_id
""".stripMargin
spark.sql(sqlAfOrgDaily)
.coalesce(coalesce.toInt)
.write.
mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outputtotal)
val sqlUserInfo=
s"""
|select t1.devid device_id,max(case when t2.platform='android' then 'gaid'
| when t2.platform='ios' then 'idfa' end ) device_type,
| max(t2.platform) platform,
|'UNKNOWN' country,
|'' age,
|'' gender,
|'' tags,
|'${update_date}' first_req_day,
|'${update_date}' last_req_day
|from
|( select /*+ mapjoin(t2)*/ devid, container_id from dwh.etl_appsflyer_audience_org where dt ='${today}' and devid rlike '${didPtn}' )
|t1 join appsflyer_audience t2 on (t1.container_id = t2.id)
|group by t1.devid
""".stripMargin
spark.sql(sqlUserInfo)
.coalesce(coalesce.toInt)
.write.
mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(dmpuserinfo)
} finally {
spark.stop()
}
0
}
}
object AppsFlyerTotal {
def main(args: Array[String]): Unit = {
new AppsFlyerTotal().run(args)
}
}