1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import java.net.URI
/**
* @package: mobvista.dmp.datasource.dm
* @author: wangjf
* @date: 2021/5/6
* @time: 10:40 上午
* @email: jinfeng.wang@mobvista.com
*/
class EtlRuidMapping extends CommonSparkJob with Serializable {
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
printOptions(commandLine)
return 1
} else {
printOptions(commandLine)
}
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val spark = MobvistaConstant.createSparkSession(s"EtlRuidMapping.$date")
val sc = spark.sparkContext
try {
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
// select idfa dev_id,ruid from default.mapping_server_request_log_h where concat(year,month,day) = '${date}' and idfa != 'illegal' and length(idfa) = 36
val sql =
s"""
|select idfv dev_id,ruid from default.mapping_server_request_log_h where concat(year,month,day) = '${date}' and idfv != 'illegal' and length(idfv) = 36
|""".stripMargin
val mapping_rdd = spark.sql(sql).dropDuplicates()
.rdd
.map(row => {
MRUtils.JOINER.join(row.getAs[String]("dev_id"), row.getAs[String]("ruid"))
})
mapping_rdd.repartition(100)
.saveAsTextFile(output, classOf[GzipCodec])
} finally {
if (spark != null) {
sc.stop()
spark.stop()
}
}
0
}
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("date", true, "[must] date")
options.addOption("output", true, "[must] output")
options
}
}
object EtlRuidMapping {
def main(args: Array[String]): Unit = {
new EtlRuidMapping().run(args)
}
}