1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package mobvista.dmp.datasource.taobao
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import java.net.URI
import scala.collection.mutable.ArrayBuffer
/**
* @author jiangfan
* @date 2021/4/23 10:42
*/
class EtlComTencentNewsDaily extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("dt_today", true, "[must] dt_today")
options.addOption("dt_dash_rec7day", true, "[must] dt_dash_rec7day")
options.addOption("dt_dash_rec15day", true, "[must] dt_dash_rec15day")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val dt_today = commandLine.getOptionValue("dt_today")
val dt_dash_rec7day = commandLine.getOptionValue("dt_dash_rec7day")
val dt_dash_rec15day = commandLine.getOptionValue("dt_dash_rec15day")
val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce")
val spark = SparkSession.builder()
.appName("EtlComTencentNewsDaily")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
val sc = spark.sparkContext
try {
val sql1 =
s"""
|select device_id,device_type,platform,'com.tencent.news_bes_7' package_name from dwh.dm_install_list_v2
|where dt='${dt_today}' and business='dsp_req' and update_date>='${dt_dash_rec7day}'
|and package_name in ('com.tencent.news_bes')
|union
|select device_id,device_type,platform,'com.tencent.news_bes_15' package_name from dwh.dm_install_list_v2
|where dt='${dt_today}' and business='dsp_req' and update_date>='${dt_dash_rec15day}'
|and package_name in ('com.tencent.news_bes')
|union
|select device_id,device_type,platform,'com.tencent.news_oppo_7' package_name from dwh.dm_install_list_v2
|where dt='${dt_today}' and business='dsp_req' and update_date>='${dt_dash_rec7day}'
|and package_name in ('com.tencent.news_oppo')
|union
|select device_id,device_type,platform,'com.tencent.news_oppo_15' package_name from dwh.dm_install_list_v2
|where dt='${dt_today}' and business='dsp_req' and update_date>='${dt_dash_rec15day}'
|and package_name in ('com.tencent.news_oppo')
""".stripMargin
val sql =
s"""
|SELECT device_id, device_type, platform, package_name, update_date
| FROM dwh.dm_install_list_v2
| WHERE dt = '${dt_today}' AND business = 'dsp_req' AND update_date >= '${dt_dash_rec15day}'
| AND package_name IN ('com.tencent.news_bes','com.tencent.news_oppo')
|""".stripMargin
def schema: StructType = {
StructType(StructField("device_id", StringType) ::
StructField("device_type", StringType) ::
StructField("platform", StringType) ::
StructField("package_name", StringType) ::
Nil)
}
val rdd = spark.sql(sql).rdd.map(row => {
val array = new ArrayBuffer[Row]()
val device_id = row.getAs[String]("device_id")
val device_type = row.getAs[String]("device_type")
val platform = row.getAs[String]("platform")
val package_name = row.getAs[String]("package_name")
val update_date = row.getAs[String]("update_date")
array += Row(device_id, device_type, platform, package_name + "_15")
if (update_date.compareTo(dt_dash_rec7day) >= 0) {
array += Row(device_id, device_type, platform, package_name + "_7")
}
array
}).flatMap(l => l)
spark.createDataFrame(rdd, schema)
.repartition(coalesce.toInt)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", value = false)
.orc(output)
} finally {
spark.stop()
}
0
}
}
object EtlComTencentNewsDaily {
def main(args: Array[String]): Unit = {
new EtlComTencentNewsDaily().run(args)
}
}