1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package mobvista.dmp.datasource.adn_adx
/**
* author andy.liu on 2019/9/24
*/
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
class AdnTecentAdxDataMidWay extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("outputadxtmp", true, "[must] outputadxtmp")
options.addOption("dimadxpkg", true, "[must] dimadxpkg")
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("today", true, "[must] today")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"
val outputadxtmp = commandLine.getOptionValue("outputadxtmp")
val dimadxpkg = commandLine.getOptionValue("dimadxpkg")
val coalesce = commandLine.getOptionValue("coalesce")
val today = commandLine.getOptionValue("today")
val spark = SparkSession.builder()
.appName("AdnTecentAdxDataMidWay")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputadxtmp), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(dimadxpkg), true)
try {
val year = today.substring(0, 4)
val month = today.substring(4, 6)
val day = today.substring(6, 8)
val ods_adn_adx_req_tmp_pre =
s"""
select request_id,device_id,'idfa' device_type,os platform,device_model,osv os_version,ct country,adx_dsp.sbid[0].bid[0].price_raw price_raw,cast(`date` as string) `date` from (
|select channel_request_id request_id,
| device_info.ifa device_id,
| device_info.md device_model,
| device_info.os os,
| device_info.osv osv,
| device_info.geo.ct ct,
| explode (dsp_response) adx_dsp,
| `date`
|from dwh.ods_adn_adx_v1_request
|where yyyy = '$year' and mm = '$month' and dd = '$day' and device_info.ifa rlike '${didPtn}') t
|where t.adx_dsp.id in (4,27) and t.os ='ios' and adx_dsp.sbid[0].bid[0].price_raw > 100
""".stripMargin
spark.sql(ods_adn_adx_req_tmp_pre)
.repartition(coalesce.toInt)
.persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("ods_adn_adx_req_tmp_pre")
val ods_adn_adx_req_tmp =
"""
|select * from ods_adn_adx_req_tmp_pre
""".stripMargin
spark.sql(ods_adn_adx_req_tmp)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outputadxtmp)
// 数据量很少,直接写表操作 hive.exec.reducers.bytes.per.reducer
val dim_adn_adx_package =
s"""
|insert overwrite table dwh.dim_adn_adx_package partition(dt='${today}')
|select substr(t1.package_name,3) package_name
|from (select request_id,package package_name,platform,backend_id
|from adn_report.adndata_midway_backend_v2 where yyyy = '$year' and mm = '$month' and dd = '$day' and platform ='2' and backend_id = '17' ) t1
|join ods_adn_adx_req_tmp_pre t2
|on(t1.request_id = t2.request_id)
|where t1.package_name like 'id%'
|group by t1.package_name
""".stripMargin
spark.sql(dim_adn_adx_package)
//.coalesce(coalesce.toInt)
//.rdd
// .saveAsTextFile("")
} finally {
spark.stop()
}
0
}
}
object AdnTecentAdxDataMidWay {
def main(args: Array[String]): Unit = {
new AdnTecentAdxDataMidWay().run(args)
}
}