package mobvista.dmp.datasource.dm import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.functions.lit import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel class YoukuPhoneWaxNobid extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output01", true, "[must] output01") options.addOption("output02", true, "[must] output02") options.addOption("begin_day", true, "[must] begin_day") options.addOption("end_day", true, "[must] end_day") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output01 = commandLine.getOptionValue("output01") val output02 = commandLine.getOptionValue("output02") val begin_day = commandLine.getOptionValue("begin_day") val end_day = commandLine.getOptionValue("end_day") val spark = SparkSession.builder() .appName("YoukuPhoneWaxNobid") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) try { val sql= s""" |select |case when split(devids,',')[5]!='' then split(devids,',')[5] |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end as devid, |case when split(devids,',')[5]!='' then 'imeimd5' |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end as device_type |from adn_dsp.log_adn_dsp_retarget_request_hour_v2 |where concat(yr,mt,dt) >= '${begin_day}' and concat(yr,mt,dt) <= '${end_day}' |and countrycode='CN' |and os=1 |and exchanges='wax' |and `describe` like '%rank%' |group by case when split(devids,',')[5]!='' then 'imeimd5' |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end , |case when split(devids,',')[5]!='' then split(devids,',')[5] |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end """.stripMargin val df01: DataFrame = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER) df01.withColumn("platform", lit("android")).withColumn("package_name", lit("[\"com.youku.phone_wax_nobid\"]")).coalesce(coalesce.toInt) .rdd.map(_.mkString("\t")).saveAsTextFile(output01) df01.withColumn("platform", lit("android")).withColumn("country", lit("CN")).coalesce(coalesce.toInt) .rdd.map(_.mkString("\t")).saveAsTextFile(output02) val sql1= s""" |select |case when split(devids,',')[5]!='' then split(devids,',')[5] |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end as devid, |case when split(devids,',')[5]!='' then 'imeimd5' |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end as device_type, |'android' as platform, |'["com.youku.phone_wax_nobid"]' as package_name |from adn_dsp.log_adn_dsp_retarget_request_hour_v2 |where concat(yr,mt,dt) >= '${begin_day}' and concat(yr,mt,dt) <= '${end_day}' |and countrycode='CN' |and os=1 |and exchanges='wax' |and `describe` like '%rank%' |group by case when split(devids,',')[5]!='' then 'imeimd5' |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end , |case when split(devids,',')[5]!='' then split(devids,',')[5] |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end """.stripMargin // spark.sql(sql1).coalesce(coalesce.toInt).rdd.map(_.mkString("\t")).saveAsTextFile(output01) val sql2= s""" |select |case when split(devids,',')[5]!='' then split(devids,',')[5] |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end as devid, |case when split(devids,',')[5]!='' then 'imeimd5' |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end as device_type, |'android' as platform, |'CN' as country |from adn_dsp.log_adn_dsp_retarget_request_hour_v2 |where concat(yr,mt,dt) >= '${begin_day}' and concat(yr,mt,dt) <= '${end_day}' |and countrycode='CN' |and os=1 |and exchanges='wax' |and `describe` like '%rank%' |group by case when split(devids,',')[5]!='' then 'imeimd5' |when split(devids,',')[13]!='' then 'oaidmd5' else 'androidid' end , |case when split(devids,',')[5]!='' then split(devids,',')[5] |when split(devids,',')[13]!='' then split(devids,',')[13] else split(devids,',')[7] end """.stripMargin // spark.sql(sql2).coalesce(coalesce.toInt).rdd.map(_.mkString("\t")).saveAsTextFile(output02) } finally { spark.stop() } 0 } } object YoukuPhoneWaxNobid { def main(args: Array[String]): Unit = { new YoukuPhoneWaxNobid().run(args) } }