1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package mobvista.dmp.datasource.facebook
import java.net.URI
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.age_gender.{Constant, Logic}
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
class FaceBookTotal extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("outputtotal", true, "[must] outputtotal")
options.addOption("outputgender",true, "[must] outputgender")
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("today", true, "[must] today")
options.addOption("yesterday", true, "[must] yesterday")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val outputtotal = commandLine.getOptionValue("outputtotal")
val outputgender = commandLine.getOptionValue("outputgender")
val coalesce = commandLine.getOptionValue("coalesce")
val today = commandLine.getOptionValue("today")
val yesterday = commandLine.getOptionValue("yesterday")
val spark = SparkSession.builder()
.appName("FaceBookTotal")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputgender), true)
try {
spark.udf.register("check_deviceId", Logic.check_deviceId _)
// spark.udf.register("check_gender", Logic.check_gender _)
val sql1=
s"""
|SELECT COALESCE(tbl_daily.device_id,tbl_total.device_id) device_id,
| COALESCE(tbl_daily.device_type,tbl_total.device_type) device_type,
| COALESCE(tbl_daily.platform,tbl_total.platform) platform,
CASE WHEN tbl_daily.package_name is null and tbl_total.package_name is not null then tbl_total.package_name
| when tbl_total.package_name is null and tbl_daily.package_name is not null then tbl_daily.package_name
| when tbl_total.package_name is not null and tbl_daily.package_name is not null then concat(tbl_daily.package_name,'#',tbl_total.package_name) end as package_name,
| COALESCE(tbl_daily.country,tbl_total.country) country,
| COALESCE(tbl_daily.gender,tbl_total.gender) gender
|from ( SELECT device_id,device_type,platform,max(package_name) package_name,max(country) country,max(case when gender = 'male' then 'm' when gender = 'female' then 'f' else gender end) gender
|from dwh.etl_facebook_daily where dt ='${today}'
|group by device_id,device_type,platform) tbl_daily
|FULL JOIN
|( SELECT device_id,device_type,platform,max(package_names) package_name,max(country) country,max(case when gender = 'male' then 'm' when gender = 'female' then 'f' else gender end) gender
|from dwh.etl_facebook_total where dt ='${yesterday}'
| group by device_id,device_type,platform ) tbl_total
|ON (tbl_daily.device_id = tbl_total.device_id and tbl_daily.device_type = tbl_daily.device_type and tbl_daily.platform = tbl_total.platform)
""".stripMargin
spark.sql(sql1).createOrReplaceTempView("etl_facebook_pre_deal")
val sql2=
s"""
|select device_id,device_type,platform,concat_ws('#',collect_set(package_name)) package_names,country,gender from
|(select device_id,device_type,platform,explode(split(package_name,'#')) package_name,country,gender
|from etl_facebook_pre_deal ) X
|group by device_id,device_type,platform,country,gender
""".stripMargin
spark.sql(sql2).createOrReplaceTempView("etl_facebook_total")
val sql3 =
s"""
|select device_id,device_type,min(platform) platform,min(package_names) package_names,min(country) country,min(gender) gender from
|etl_facebook_total
|group by device_id,device_type
""".stripMargin
val etl_fb_total = spark.sql(sql3).persist(StorageLevel.MEMORY_AND_DISK)
etl_fb_total.coalesce(coalesce.toInt)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outputtotal)
val gender_rdd = etl_fb_total.rdd.map(line => {
val device_id = line.getAs[String]("device_id")
val device_type = line.getAs[String]("device_type")
val gender = line.getAs[String]("gender")
Row(device_id, "A", gender, "fb", device_type)
})
spark.createDataFrame(gender_rdd, Constant.schema_age_gender)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outputgender)
} finally {
spark.stop()
}
0
}
}
object FaceBookTotal {
def main(args: Array[String]): Unit = {
new FaceBookTotal().run(args)
}
}