1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package mobvista.dmp.datasource.baichuan
import java.net.URI
import mobvista.dmp.datasource.baichuan.Constant.BaiChuan
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode
import org.apache.spark.storage.StorageLevel
/**
* @package: mobvista.dmp.datasource.baichuan
* @author: wangjf
* @date: 2019-08-29
* @time: 20:52
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class BaiChuanEtlJob extends java.io.Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("date", true, "date")
options.addOption("output", true, "output")
options
}
protected def run(args: Array[String]) {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"BaiChuanEtlJob.$date")
try {
spark.udf.register("hasNo", Constant.hasNo _)
val df = spark.sql(Constant.filter_sql.replace("@date", date))
.persist(StorageLevel.MEMORY_AND_DISK_SER)
df.createOrReplaceTempView("ods_user_info_daily")
val set = Set("387682726", "518966501", "com.taobao.taobao", "com.tmall.wireless")
val itr = set.iterator
import spark.implicits._
/**
* 分别处理 天猫,淘宝不同OS 设备: 2 淘宝 1 天猫
*/
while (itr.hasNext) {
var output_dir = output
var output_imei = output
var sql = Constant.etl_sql
var sql_imeimd5 = Constant.etl_sql
var imeiFlag = false
/**
* appOsId: 1 android 2 IOS 3 android_md5
*/
val ir = itr.next
ir match {
case "387682726" =>
sql = sql.replace("@package", "387682726").replace("@appId", "2")
.replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2")
output_dir = output_dir + "/" + "2_2"
case "518966501" =>
sql = sql.replace("@package", "518966501").replace("@appId", "1")
.replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2")
output_dir = output_dir + "/" + "1_2"
case "com.taobao.taobao" =>
sql = sql.replace("@package", "com.taobao.taobao").replace("@appId", "2")
.replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1")
output_dir = output_dir + "/" + "2_1"
sql_imeimd5 = sql_imeimd5.replace("@package", "com.taobao.taobao").replace("@appId", "2")
.replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3")
imeiFlag = true
output_imei = output_imei + "/" + "2_3"
case "com.tmall.wireless" =>
sql = sql.replace("@package", "com.tmall.wireless").replace("@appId", "1")
.replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1")
output_dir = output_dir + "/" + "1_1"
sql_imeimd5 = sql_imeimd5.replace("@package", "com.tmall.wireless").replace("@appId", "1")
.replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3")
imeiFlag = true
output_imei = output_imei + "/" + "1_3"
}
// 从 install 获取的设备信息
val dff = spark.sql(sql).rdd.map(r => {
BaiChuan(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString))
}).toDF.dropDuplicates
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output_dir), true)
dff.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output_dir)
if (imeiFlag) {
val imeiDff = spark.sql(sql_imeimd5).rdd.map(r => {
BaiChuan(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString))
}).toDF.dropDuplicates
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output_imei), true)
imeiDff.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output_imei)
}
}
} finally {
if (spark != null) {
spark.stop()
}
}
}
}
object BaiChuanEtlJob {
def main(args: Array[String]): Unit = {
new BaiChuanEtlJob().run(args)
}
}