1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package mobvista.dmp.clickhouse.dsp
import java.text.SimpleDateFormat
import java.util.Date
import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory
import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.SparkSession
import ru.yandex.clickhouse.ClickHouseDataSource
import scala.collection.mutable
/**
* @package: mobvista.dmp.clickhouse.dsp
* @author: wangjf
* @date: 2019-10-17
* @time: 10:20
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class DspEtlHourToCK extends Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("date", true, "date")
options.addOption("host", true, "host")
options.addOption("cluster", true, "cluster")
options.addOption("database", true, "database")
options.addOption("table", true, "table")
options.addOption("input", true, "input")
options.addOption("region", true, "region")
options.addOption("hour", true, "hour")
options
}
val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
val sdf2 = new SimpleDateFormat("yyyyMMdd")
protected def run(args: Array[String]) {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val cluster = commandLine.getOptionValue("cluster")
val host = commandLine.getOptionValue("host")
val database = commandLine.getOptionValue("database")
val table = commandLine.getOptionValue("table")
val input = commandLine.getOptionValue("input")
val region = commandLine.getOptionValue("region")
val hour = commandLine.getOptionValue("hour")
val spark = SparkSession
.builder()
.appName("DspEtlHourToCK")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "lz4")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
try {
val sc = spark.sparkContext
val clusterName = Some(cluster): Option[String]
implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)
val partDate = sdf2.format(sdf1.parse(date))
import spark.implicits._
// val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// println(sdf.format(new Date()))
val dspDF = spark.read.orc(input)
.rdd
.map(r => {
val set = new mutable.HashSet[Int]()
val package_list = r.getAs("package_list").asInstanceOf[mutable.WrappedArray[Int]]
package_list.foreach(packageId => {
set.add(packageId)
})
val gender = if (r.getAs("gender") != null && StringUtils.isNotBlank(r.getAs("gender").toString)) {
r.getAs("gender").toString match {
case "m" => 1
case "f" => 2
case _ => 0
}
} else {
0
}
val version = new Date().getTime
RealtimeServiceHour(r.getAs("device_id").toString.toUpperCase, r.getAs("platform"),
gender, 0,
if (r.getAs("country_code") != null && r.getAs("country_code").toString.length == 2) {
r.getAs("country_code")
} else {
""
},
Array.empty[String], mutable.WrappedArray.make(set.toArray), "", version)
}).toDF
/**
* user_info save
*/
// dspDF.createClickHouseDb(database, clusterName)
// dspDF.createClickHouseTable(database, table, Seq("dt", "hour", "region"), Constant.indexColumn, Constant.orderColumn, clusterName)
val emptyDF = spark.emptyDataFrame
emptyDF.dropPartition(database, table, s"($partDate,'$hour','$region')", clusterName)
dspDF.saveToClickHouse(database, table, Seq(date, hour, region), Seq("dt", "hour", "region"), clusterName, batchSize = 1000000)
} finally {
if (spark != null) {
spark.stop()
}
}
}
}
object DspEtlHourToCK {
def main(args: Array[String]): Unit = {
new DspEtlHourToCK().run(args)
}
}