1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package mobvista.dmp.datasource.facebook
import java.net.URI
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* @author: kehan
* @date: 2019/06/26
*/
class FaceBookDaily extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("input", true, "[must] input")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("today", true, "[must] today")
options.addOption("yesterday", true, "[must] yesterday")
options.addOption("last_sunday", true, "[must] last_sunday")
options.addOption("unmatched", true, "[must] yesterday")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val input = commandLine.getOptionValue("input")
val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce")
val today = commandLine.getOptionValue("today")
val yesterday = commandLine.getOptionValue("yesterday")
val last_sunday = commandLine.getOptionValue("last_sunday")
val unmatched = commandLine.getOptionValue("unmatched")
val spark = SparkSession.builder()
.appName("FaceBookDaily")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(unmatched), true)
try {
val sql1=
s"""
|select t.device_id_md5,t.device_type,t.platform,t.package_name,t.country,t.gender from
|(select device_id as device_id_md5,
|case when os='android' then 'gaid' when os='ios' then 'idfa' else '' end as device_type,
|os as platform,
|package_name,
|user_country as country,
|case when genders[0]=1 and genders[1] is null then 'male'
|when genders[0]=2 and genders[1] is null then 'female'
|else null end as gender
|from dwh.etl_fb_org_daily where dt='${today}'
|union all
|select device_id_md5,device_type,platform,package_name,country,gender
|from dwh.etl_fb_unmatched_history where dt='${yesterday}') t
""".stripMargin
spark.sql(sql1)
.persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("etl_fb_all_daily")
spark.sql("select device_id_md5 from etl_fb_all_daily group by device_id_md5")
.createOrReplaceTempView("fb_device_md5")
val sql2=
s"""
|select /*+ MAPJOIN(a) */ b.device_id,a.device_id_md5
|from fb_device_md5 a join
|(select * from dwh.device_id_md5_match where dt='${last_sunday}') b
|on a.device_id_md5=b.device_id_md5
""".stripMargin
spark.sql(sql2).createOrReplaceTempView("etl_fb_md5")
val sql3=
"""
|select b.device_id,a.device_id_md5,a.device_type,a.platform,a.package_name,a.country,a.gender
|from etl_fb_all_daily a left join etl_fb_md5 b on a.device_id_md5=b.device_id_md5
""".stripMargin
spark.sql(sql3)
.persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("etl_fb_md5_daily")
val sql4=
"""
|select device_id,
|max(device_id_md5) device_id_md5,
|max(device_type) device_type,
|max(platform) platform,
|concat_ws('#',collect_set(package_name)) package_name,
|max(country) country,
|max(gender) gender
|from etl_fb_md5_daily where device_id is not null group by device_id
""".stripMargin
spark.sql(sql4).coalesce(coalesce.toInt)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
.orc(output)
spark.sql("select device_id_md5,device_type,platform,package_name,country,gender from etl_fb_md5_daily where device_id is null")
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(unmatched)
} finally {
spark.stop()
}
0
}
}
object FaceBookDaily {
def main(args: Array[String]): Unit = {
new FaceBookDaily().run(args)
}
}