package mobvista.dmp.datasource.postback_3s import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} class PostBackDaily extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("output", true, "[must] output") options.addOption("coalesce", true, "[must] coalesce") options.addOption("today", true, "[must] today") options.addOption("last_sunday", true, "[must] last_sunday") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val today = commandLine.getOptionValue("today") val last_sunday = commandLine.getOptionValue("last_sunday") val spark = SparkSession.builder() .appName("PostBackDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) val year = today.substring(0, 4) val month = today.substring(4, 6) val day = today.substring(6, 8) try { val sql1 = s""" |select UPPER(idfa) idfa,md5_idfa,LOWER(gaid) gaid,md5_gaid,lower(pl) platform,lower(app_id) package_name,country,install_time,cast(`date` as string) update_date,type from dwh.ods_adn_trackingnew_postback_install | where yyyy = '$year' and mm = '$month' and dd = '$day' and type in ('appsflyer','min_appsflyer','tenjin','adjust','reyun') and app_id!="0" |union |select UPPER(idfa) idfa,md5_idfa,LOWER(gaid) gaid,md5_gaid,lower(pl) platform,lower(app_id) package_name,country,install_time,cast(`date` as string) update_date,type from dwh.ods_adn_trackingnew_postback_event | where yyyy = '$year' and mm = '$month' and dd = '$day' and type in ('appsflyer','min_appsflyer','tenjin','adjust','reyun') and app_id!="0" """.stripMargin spark.sql(sql1).createOrReplaceTempView("etl_3s_postback_daily") spark.sql("select coalesce(md5_idfa,md5_gaid) device_id_md5 from etl_3s_postback_daily group by coalesce(md5_idfa,md5_gaid)") .cache() .createOrReplaceTempView("3s_postback_device_md5") val sql2 = s""" |select b.device_id,a.device_id_md5 |from 3s_postback_device_md5 a join |(select * from dwh.device_id_md5_match where dt='${last_sunday}') b |on a.device_id_md5=b.device_id_md5 """.stripMargin spark.sql(sql2).createOrReplaceTempView("etl_3s_postback_md5") val sql3 = """ |select coalesce(a.device_id,b.device_id) device_id, |a.device_type, |a.platform, |a.package_name, |a.country, |a.install_time, |a.update_date, |a.type |from ( |select case when platform = 'ios' and idfa is not null then idfa when platform = 'android' and gaid is not null then gaid end as device_id, |case when platform = 'ios' and idfa is null then md5_idfa when platform = 'android' and gaid is null then md5_gaid end as device_id_md5, |case when platform = 'ios' then 'idfa' when platform = 'android' then 'gaid' end as device_type, |platform, |case when instr(package_name,'id') = 1 then substr(package_name,3) else package_name end as package_name, |country, |install_time, |update_date, |type |from etl_3s_postback_daily ) a left join etl_3s_postback_md5 b on a.device_id_md5=b.device_id_md5 |where a.device_id is not null and a.package_name is not null """.stripMargin spark.sql(sql3).createOrReplaceTempView("etl_3s_pre_daily") val sql4 = """ |select device_id, |max(device_type) device_type, |max(platform) platform, |concat_ws('#',collect_set(package_name)) package_name, |max(country) country, |max(install_time) install_time, |max(update_date) update_date, |max(type) type |from etl_3s_pre_daily group by device_id """.stripMargin spark.sql(sql4).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .orc(output) } finally { spark.stop() } 0 } } object PostBackDaily { def main(args: Array[String]): Unit = { new PostBackDaily().run(args) } }