package mobvista.dmp.datasource.joypac import org.apache.spark.sql._ import org.apache.spark.sql.types._; /** * 刘凯 2019-02-18 15:20 * joypc_sdk fluentd数据接入至etl_joypc_sdk_daily表 */ case class JoypcSdkDaily(id: String, idfa: String, app_version: String, brand: String, network_type: String, package_name: String, platform: String, language: String, os_version: String, app_version_code: String, model: String, time_zone: String, apps_info: String, time: String) object JoypcSdkDaily extends Serializable { def main(args: Array[String]) { val spark = SparkSession.builder() .enableHiveSupport() .getOrCreate() val loadTime = spark.conf.get("spark.app.loadTime") var year = loadTime.substring(0, 4) var month = loadTime.substring(4, 6) val day = loadTime.substring(6, 8) val input_path = spark.conf.get("spark.app.input_path").replace("*", "") val output_path = spark.conf.get("spark.app.output_path") try { val log_rdd = spark.sparkContext .textFile(input_path) val filter_rdd = log_rdd.filter(_.length != 1).map(p => { val etl_json = JoypcSdkTools.getEtlJSON(p) val id = etl_json.get("id") val idfa = etl_json.get("idfa") val app_version = etl_json.get("app_version") val brand = etl_json.get("brand") val network_type = etl_json.get("network_type") val package_name = etl_json.get("package_name") val platform = etl_json.get("platform") val language = etl_json.get("language") val business_name = etl_json.get("business_name") val apps_info = etl_json.get("apps_info") val business_pass = etl_json.get("business_pass") val os_version = etl_json.get("os_version") val app_version_code = etl_json.get("app_version_code") val model = etl_json.get("model") val time_zone = etl_json.get("time_zone") val time = etl_json.get("time") Row( business_name, business_pass, id, idfa, app_version, brand, network_type, package_name, platform, language, os_version, app_version_code, model, time_zone, apps_info, time) }).filter(x => { val business_name = x.getString(0) val business_pass = x.getString(1) val result = business_name.equals("joypac_ios") && business_pass.equals("joypac_ios-sdk0121") result }) val cal_rdd = filter_rdd.map { p => parseCalData(p) } val joypc_schema = StructType(Array( StructField("id", StringType), StructField("idfa", StringType), StructField("app_version", StringType), StructField("brand", StringType), StructField("network_type", StringType), StructField("package_name", StringType), StructField("platform", StringType), StructField("language", StringType), StructField("os_version", StringType), StructField("app_version_code", StringType), StructField("model", StringType), StructField("time_zone", StringType), StructField("apps_info", StringType), StructField("time", StringType) )) val joypc_df = spark.createDataFrame(cal_rdd, joypc_schema) if (joypc_df.count() > 1) { joypc_df.coalesce(100).write.format("orc").mode("overwrite").save(output_path) } else { import spark.implicits._ Seq.empty[JoypcSdkDaily].toDF .coalesce(1).write.format("orc").mode("overwrite").save(output_path) } } finally { spark.stop() } } def parseCalData(row: Row) = { Row( row(2), row(3), row(4), row(5), row(6), row(7), row(8), row(9), row(10), row(11), row(12), row(13), row(14), row(15)) } }