1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package mobvista.dmp.datasource.taobao
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.storage.StorageLevel
import java.net.URI
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("dt_today", true, "[must] dt_today")
options.addOption("dt_oneday_ago", true, "[must] dt_oneday_ago")
options.addOption("update", true, "[must] update")
options.addOption("output", true, "[must] output")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val dt_today = commandLine.getOptionValue("dt_today")
val dt_oneday_ago = commandLine.getOptionValue("dt_oneday_ago")
val update = commandLine.getOptionValue("update")
val output = commandLine.getOptionValue("output")
val spark = MobvistaConstant.createSparkSession("UCOtherDataToDmp")
mutable.WrappedArray
val sc = spark.sparkContext
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
try {
val conf = spark.sparkContext.hadoopConfiguration
conf.set("mapreduce.output.compress", "true")
conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])
val sql =
s"""
|SELECT device_id, COLLECT_SET(package_name) install_list
| FROM
| (
| SELECT device_id, package_name
| FROM dwh.dm_install_list_v2
| WHERE dt = '${dt_today}' AND business = 'uc_activation' AND device_type = 'imeimd5'
| AND package_name IN ('com.uc.foractivation.4b5a58','com.uc.foractivation.d3f521')
| UNION
| SELECT device_id, package_name
| FROM dwh.dm_install_list_v2
| WHERE dt = '${dt_oneday_ago}' AND business = 'dsp_req' AND device_type = 'imeimd5'
| AND package_name IN ('com.UCMobile_bes','com.ucmobile_oppo')
| ) t
| GROUP BY device_id
|""".stripMargin
val df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER)
val rdd = df.rdd.map(r => {
val arrayBuffer = new ArrayBuffer[(Text, Text)]()
val deviceId = r.getAs[String]("device_id")
val deviceType = "imeimd5"
val platform = "android"
val installList = r.getAs[mutable.WrappedArray[String]]("install_list")
if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.UCMobile_bes")) {
arrayBuffer += ((new Text(s"$output/4b5a58_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucbes", update))))
}
if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.UCMobile_bes")) {
arrayBuffer += ((new Text(s"$output/d3f521_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucbes", update))))
}
if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.ucmobile_oppo")) {
arrayBuffer += ((new Text(s"$output/4b5a58_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucoppo", update))))
}
if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.ucmobile_oppo")) {
arrayBuffer += ((new Text(s"$output/d3f521_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucoppo", update))))
}
arrayBuffer
}).flatMap(l => {
l
})
rdd.coalesce(50)
.saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
} finally {
spark.stop()
}
0
}
}
object UCOtherDataToDmpV2 {
def main(args: Array[String]): Unit = {
new UCOtherDataToDmpV2().run(args)
}
}