1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package mobvista.dmp.datasource.joypac
import org.apache.spark.sql._
import org.apache.spark.sql.types._;
/**
* 刘凯 2019-02-18 15:20
* joypc_sdk fluentd数据接入至etl_joypc_sdk_daily表
*/
case class JoypcSdkDaily(id: String, idfa: String, app_version: String, brand: String, network_type: String, package_name: String, platform: String,
language: String, os_version: String, app_version_code: String, model: String, time_zone: String, apps_info: String, time: String)
object JoypcSdkDaily extends Serializable {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate()
val loadTime = spark.conf.get("spark.app.loadTime")
var year = loadTime.substring(0, 4)
var month = loadTime.substring(4, 6)
val day = loadTime.substring(6, 8)
val input_path = spark.conf.get("spark.app.input_path").replace("*", "")
val output_path = spark.conf.get("spark.app.output_path")
try {
val log_rdd = spark.sparkContext
.textFile(input_path)
val filter_rdd = log_rdd.filter(_.length != 1).map(p => {
val etl_json = JoypcSdkTools.getEtlJSON(p)
val id = etl_json.get("id")
val idfa = etl_json.get("idfa")
val app_version = etl_json.get("app_version")
val brand = etl_json.get("brand")
val network_type = etl_json.get("network_type")
val package_name = etl_json.get("package_name")
val platform = etl_json.get("platform")
val language = etl_json.get("language")
val business_name = etl_json.get("business_name")
val apps_info = etl_json.get("apps_info")
val business_pass = etl_json.get("business_pass")
val os_version = etl_json.get("os_version")
val app_version_code = etl_json.get("app_version_code")
val model = etl_json.get("model")
val time_zone = etl_json.get("time_zone")
val time = etl_json.get("time")
Row(
business_name,
business_pass,
id,
idfa,
app_version,
brand,
network_type,
package_name,
platform,
language,
os_version,
app_version_code,
model,
time_zone,
apps_info,
time)
}).filter(x => {
val business_name = x.getString(0)
val business_pass = x.getString(1)
val result = business_name.equals("joypac_ios") && business_pass.equals("joypac_ios-sdk0121")
result
})
val cal_rdd = filter_rdd.map { p => parseCalData(p) }
val joypc_schema = StructType(Array(
StructField("id", StringType),
StructField("idfa", StringType),
StructField("app_version", StringType),
StructField("brand", StringType),
StructField("network_type", StringType),
StructField("package_name", StringType),
StructField("platform", StringType),
StructField("language", StringType),
StructField("os_version", StringType),
StructField("app_version_code", StringType),
StructField("model", StringType),
StructField("time_zone", StringType),
StructField("apps_info", StringType),
StructField("time", StringType)
))
val joypc_df = spark.createDataFrame(cal_rdd, joypc_schema)
if (joypc_df.count() > 1) {
joypc_df.coalesce(100).write.format("orc").mode("overwrite").save(output_path)
} else {
import spark.implicits._
Seq.empty[JoypcSdkDaily].toDF
.coalesce(1).write.format("orc").mode("overwrite").save(output_path)
}
} finally {
spark.stop()
}
}
def parseCalData(row: Row) = {
Row(
row(2),
row(3),
row(4),
row(5),
row(6),
row(7),
row(8),
row(9),
row(10),
row(11),
row(12),
row(13),
row(14),
row(15))
}
}