package mobvista.dmp.datasource.taobao import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel class EtlAliIosActivitionDaily extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("output", true, "[must] output") options.addOption("outputdaily", true, "[must] outputdaily") options.addOption("coalesce", true, "[must] coalesce") options.addOption("today", true, "[must] today") options.addOption("yesterday", true, "[must] yesterday") options.addOption("dt_dash_today", true, "[must] dt_dash_today") options.addOption("last_req_day", true, "[must] last_req_day") options.addOption("dt_dash_rec14day", true, "[must] dt_dash_rec14day") options.addOption("request_count_result", true, "[must] request_count_result") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val output = commandLine.getOptionValue("output") val outputdaily = commandLine.getOptionValue("outputdaily") val coalesce = commandLine.getOptionValue("coalesce") val today = commandLine.getOptionValue("today") val dt_dash_today = commandLine.getOptionValue("dt_dash_today") val last_req_day = commandLine.getOptionValue("last_req_day") val dt_dash_rec14day = commandLine.getOptionValue("dt_dash_rec14day") val yesterday = commandLine.getOptionValue("yesterday") val request_count_result = commandLine.getOptionValue("request_count_result") val spark = SparkSession.builder() .appName("EtlAliActivitionDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputdaily), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(request_count_result), true) try { val sql1= s""" |select XX.device_id_md5,XX.device_id,XX.device_type |FROM (select X.device_id_md5,X.device_id,X.device_type, |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk |from ( select device_id,device_type, |case when device_type = 'idfa' then MD5(device_id) when device_type = 'idfamd5' then device_id end as device_id_md5 |from dwh.ods_dmp_user_info where dt ='${today}' | and device_type in ('idfa','idfamd5') | and last_req_day >='${last_req_day}' | and upper(country) = 'CN' ) X ) XX | WHERE XX.rk= 1 """.stripMargin val dfCache: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER) dfCache.createOrReplaceTempView("ods_user_info_daily") val sql2="select count(distinct device_id_md5) from ods_user_info_daily" spark.sql(sql2).rdd.map(_.mkString("==")).saveAsTextFile(request_count_result) val dt_14days_ago: String = dt_dash_rec14day.replace("-", "") //当天设备 - 最近14天设备 = 要上传oss设备 // 9f89c84a559f573636a47ff8daed0d33 是 "00000000-0000-0000-0000-000000000000" 的md5值,过滤掉该设备 val sql3 = s""" |select t1.device_id_md5 |from ods_user_info_daily t1 |left join ( | select dev_id device_id_md5 | from dwh.ali_ios_user_activation_daily where dt >='${dt_14days_ago}' and dt<='${yesterday}' |) t2 |on(t1.device_id_md5 = t2.device_id_md5) |where t2.device_id_md5 is null and t1.device_id_md5!='9f89c84a559f573636a47ff8daed0d33' |group by t1.device_id_md5 limit 30000000 """.stripMargin // val sql3 = // s""" // |select t1.device_id_md5 // |from ods_user_info_daily t1 // |group by t1.device_id_md5 limit 150 // """.stripMargin val dfCacheUpload: DataFrame = spark.sql(sql3).persist(StorageLevel.MEMORY_AND_DISK_SER) dfCacheUpload.createOrReplaceTempView("upload_data_daily") dfCacheUpload.rdd.map(_.mkString).coalesce(40).saveAsTextFile(outputdaily) // 得到最近15天的设备,即当天设备与昨天分区最近14天的设备合并 val sql4 = s""" |select t.device_id_md5,t.device_type,t.device_id, t.update_date |from ( | select t.device_id_md5,t.device_type,t.device_id,t.update_date, | row_number() over(partition by t.device_id_md5 order by t.update_date desc) rk | from ( | select device_id_md5,device_id,device_type, update_date | from | (select t1.device_id_md5,t1.device_id,t1.device_type, '${dt_dash_today}' as update_date | from ods_user_info_daily t1 | join upload_data_daily t2 | on(t1.device_id_md5 = t2.device_id_md5) ) tmp_today | union all | select device_id_md5,device_id,device_type, update_date | from dwh.ali_ios_user_activation_rec15days | where dt='${yesterday}' and update_date >= '${dt_dash_rec14day}' | ) t |) t |where t.rk='1' """.stripMargin spark.sql(sql4).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .orc(output) } finally { spark.stop() } 0 } } object EtlAliIosActivitionDaily { def main(args: Array[String]): Unit = { new EtlAliIosActivitionDaily().run(args) } }