1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package mobvista.dmp.datasource.age_gender
import java.net.URI
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
/**
* @package: mobvista.dmp.datasource.age
* @author: wangjf
* @create: 2018-09-10 16:46
* */
class MergeInstallGender extends CommonSparkJob with Serializable {
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else {
printOptions(commandLine)
}
val dsp_gender_path = commandLine.getOptionValue("dsp_gender_path")
val ga_gender_path = commandLine.getOptionValue("ga_gender_path")
val fb_gender_path = commandLine.getOptionValue("fb_gender_path")
val tp_gender_path = commandLine.getOptionValue("tp_gender_path")
val gender_output = commandLine.getOptionValue("gender_output")
val date = commandLine.getOptionValue("date")
// val ga_date = commandLine.getOptionValue("ga_date")
val parallelism = commandLine.getOptionValue("parallelism")
val spark = SparkSession.builder()
.appName("MergeInstallGender")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(gender_output), true)
try {
val dsp_gender = spark.read.schema(Constant.schema_age_gender).orc(dsp_gender_path)
val ga_gender = spark.read.schema(Constant.schema_age_gender).orc(ga_gender_path)
val fb_gender = spark.read.schema(Constant.schema_age_gender).orc(fb_gender_path)
val tp_gender = spark.read.schema(Constant.schema_age_gender).orc(tp_gender_path)
dsp_gender.union(ga_gender).union(fb_gender).union(tp_gender).createOrReplaceTempView("t_gender")
spark.udf.register("pkg_keys", Logic.pkg_keys _)
// spark.udf.register("split_keys", Logic.split_keys _)
// val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -91), "yyyy-MM-dd")
val sql = Constant.dmp_install_list_sql_14days.replace("@date", date)
// .replace("@ga_date", ga_date)
// .replace("@update_date", update_date)
spark.sql(sql).createOrReplaceTempView("t_install")
spark.sql(Constant.dmp_install_list_join_gender_sql).repartition(parallelism.toInt)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(gender_output)
/*
val install = spark.sql(sql)
.rdd
.mapPartitions(buildInstall)
.combineByKey(
(v: String) => Iterable(v),
(c: Iterable[String], v: String) => c ++ Seq(v),
(c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2
).mapPartitions(Logic.mergeInstallPart)
val age_df = install.union(dsp_gender).union(ga_gender).union(fb_gender).union(tp_gender)
.combineByKey(
(v: String) => Iterable(v),
(c: Iterable[String], v: String) => c ++ Seq(v),
(c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2
) mapPartitions (Logic.mergeGenderPart)
spark.createDataFrame(age_df.coalesce(numPartitions = parallelism.toInt, shuffle = true), Constant.merge_schema)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(gender_output)
*/
} finally {
sc.stop()
spark.stop()
}
0
}
def buildAgeGender(rows: Iterator[Row]): Iterator[(String, String)] = {
rows.map(row => {
(MRUtils.JOINER.join(row.getAs("device_id"), row.getAs("device_type")), MRUtils.JOINER.join(row.getAs("tag"), row.getAs("label"),
row.getAs("business")))
})
}
def buildInstall(rows: Iterator[Row]): Iterator[(String, String)] = {
rows.map(row => {
(MRUtils.JOINER.join(row.getAs("device_id"), row.getAs("device_type")), MRUtils.JOINER.join(row.getAs("package_names"),
row.getAs("new_date")))
})
}
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("dsp_gender_path", true, "[must] dsp_gender_path")
options.addOption("ga_gender_path", true, "[must] ga_gender_path")
options.addOption("fb_gender_path", true, "[must] fb_gender_path")
options.addOption("tp_gender_path", true, "[must] tp_gender_path")
options.addOption("gender_output", true, "[must] gender_output")
options.addOption("date", true, "[must] date")
// options.addOption("ga_date", true, "[must] ga_date")
options.addOption("parallelism", true, "[must] parallelism")
options
}
}
object MergeInstallGender {
def main(args: Array[String]): Unit = {
new MergeInstallGender().run(args)
}
}