package mobvista.dmp.datasource.datatory import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.{SaveMode, SparkSession} /** * @package: mobvista.dmp.datasource.datatory * @author: wangjf * @date: 2019-08-19 15:01:03 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class TrackingEventDailyV2 extends CommonSparkJob with java.io.Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val spark = SparkSession .builder() .appName("TrackingEventDailyV2") .config("spark.rdd.compress", "true") .config("spark.shuffle.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "lz4") .config("spark.io.compression.lz4.blockSize", "64k") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext try { val campaign_list_cpi = ConstantV2.jdbcConnection(spark, "mob_adn", "campaign_list_cpi") .select("campaign_id", "create_src") .toDF("campaign_id", "create_src") val campaign_list = ConstantV2.jdbcConnection(spark, "mob_adn", "campaign_list") .select("id", "network_cid") .toDF("id", "network_cid") val campaign_df = campaign_list.join(campaign_list_cpi, campaign_list_cpi("campaign_id") === campaign_list("id"), "inner") .select("id", "CAST(create_src AS string) create_src", "network_cid") .dropDuplicates campaign_df.createOrReplaceTempView("campaign_list") spark.udf.register("check_deviceId", Constant.check_deviceId _) import spark.implicits._ val ss_sql = ConstantV2.tracking_ss_event_sql.replace("@date", date) val ss_df = spark.sql(ss_sql).rdd.map(r => { val android_id = r.getAs("android_id") val imei = r.getAs("imei") val gaid = r.getAs("gaid") val idfa = r.getAs("idfa") var dev_id = "" if (StringUtils.isNotBlank(imei)) { dev_id = imei } else if (StringUtils.isNotBlank(idfa)) { dev_id = idfa } else if (StringUtils.isNotBlank(gaid)) { dev_id = gaid } else if (StringUtils.isNotBlank(android_id)) { dev_id = android_id } EventClass(dev_id, r.getAs("platform"), r.getAs("campaign_id"), r.getAs("event_name"), r.getAs("country")) }).filter(e => { StringUtils.isNotBlank(e.dev_id) }).toDF.dropDuplicates() ss_df.createOrReplaceTempView("tracking_ss") ss_df.write .mode(SaveMode.Overwrite) .option("orc.compress", "snappy") .orc(output + "/tracking_ss") val event_sql = ConstantV2.tracking_3s_event_sql.replace("@date", date) .replace("@check_deviceId", "check_deviceId(dev_id)") val event_df = spark.sql(event_sql) event_df.createOrReplaceTempView("tracking_event") spark.sql(ConstantV2.campaing_info_sql) .rdd.saveAsTextFile(output + "/campaign_info", classOf[GzipCodec]) event_df.write .mode(SaveMode.Overwrite) .option("orc.compress", "snappy") .orc(output + "/tracking_event") } finally { if (spark != null) { spark.stop() } } 0 } } object TrackingEventDailyV2 { def main(args: Array[String]): Unit = { new TrackingEventDailyV2().run(args) } }