package mobvista.dmp.datasource.taobao import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import java.net.URI class EtlLazadaActivitionDaily extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("gaidoutput", true, "[must] gaidoutput") options.addOption("today", true, "[must] today") options.addOption("dt_six_days_ago", true, "[must] dt_six_days_ago") options.addOption("input_one_day", true, "[must] input_one_day") options.addOption("input_two_day", true, "[must] input_two_day") options.addOption("input_three_day", true, "[must] input_three_day") options.addOption("th_gaidoutput", true, "[must] th_gaidoutput") options.addOption("vn_gaidoutput", true, "[must] vn_gaidoutput") options.addOption("ph_gaidoutput", true, "[must] ph_gaidoutput") options.addOption("my_gaidoutput", true, "[must] my_gaidoutput") options.addOption("sg_gaidoutput", true, "[must] sg_gaidoutput") options.addOption("th_input_one_day", true, "[must] th_input_one_day") options.addOption("vn_input_one_day", true, "[must] vn_input_one_day") options.addOption("ph_input_one_day", true, "[must] ph_input_one_day") options.addOption("my_input_one_day", true, "[must] my_input_one_day") options.addOption("sg_input_one_day", true, "[must] sg_input_one_day") options.addOption("dt_30days_ago", true, "[must] dt_30days_ago") options.addOption("th_input_two_day", true, "[must] th_input_two_day") options.addOption("vn_input_two_day", true, "[must] vn_input_two_day") options.addOption("ph_input_two_day", true, "[must] ph_input_two_day") options.addOption("my_input_two_day", true, "[must] my_input_two_day") options.addOption("sg_input_two_day", true, "[must] sg_input_two_day") options.addOption("th_input_three_day", true, "[must] th_input_three_day") options.addOption("vn_input_three_day", true, "[must] vn_input_three_day") options.addOption("ph_input_three_day", true, "[must] ph_input_three_day") options.addOption("my_input_three_day", true, "[must] my_input_three_day") options.addOption("sg_input_three_day", true, "[must] sg_input_three_day") options.addOption("last_req_day", true, "[must] last_req_day") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val gaidoutput = commandLine.getOptionValue("gaidoutput") val today = commandLine.getOptionValue("today") val dt_six_days_ago = commandLine.getOptionValue("dt_six_days_ago") val input_one_day = commandLine.getOptionValue("input_one_day") val input_two_day = commandLine.getOptionValue("input_two_day") val input_three_day = commandLine.getOptionValue("input_three_day") val th_gaidoutput = commandLine.getOptionValue("th_gaidoutput") val vn_gaidoutput = commandLine.getOptionValue("vn_gaidoutput") val ph_gaidoutput = commandLine.getOptionValue("ph_gaidoutput") val my_gaidoutput = commandLine.getOptionValue("my_gaidoutput") val sg_gaidoutput = commandLine.getOptionValue("sg_gaidoutput") val th_input_one_day = commandLine.getOptionValue("th_input_one_day") val vn_input_one_day = commandLine.getOptionValue("vn_input_one_day") val ph_input_one_day = commandLine.getOptionValue("ph_input_one_day") val my_input_one_day = commandLine.getOptionValue("my_input_one_day") val sg_input_one_day = commandLine.getOptionValue("sg_input_one_day") val dt_30days_ago = commandLine.getOptionValue("dt_30days_ago") val th_input_two_day = commandLine.getOptionValue("th_input_two_day") val vn_input_two_day = commandLine.getOptionValue("vn_input_two_day") val ph_input_two_day = commandLine.getOptionValue("ph_input_two_day") val my_input_two_day = commandLine.getOptionValue("my_input_two_day") val sg_input_two_day = commandLine.getOptionValue("sg_input_two_day") val th_input_three_day = commandLine.getOptionValue("th_input_three_day") val vn_input_three_day = commandLine.getOptionValue("vn_input_three_day") val ph_input_three_day = commandLine.getOptionValue("ph_input_three_day") val my_input_three_day = commandLine.getOptionValue("my_input_three_day") val sg_input_three_day = commandLine.getOptionValue("sg_input_three_day") val last_req_day = commandLine.getOptionValue("last_req_day") val spark = SparkSession.builder() .appName("EtlLazadaActivitionDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(gaidoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(th_gaidoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(vn_gaidoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(ph_gaidoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(my_gaidoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(sg_gaidoutput), true) try { val sql2= s""" |select lower(device_id) device_id,lower(country) country |from dwh.ods_dmp_user_info where dt = '${today}' and last_req_day >= '${last_req_day}' and business not in ('other', 'ali_acquisition', 'ali_activation', 'adn_install') | and device_type='gaid' | and platform='android' | group by lower(device_id),lower(country) """.stripMargin val dfCache: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER) // 2021.05.21需求,所有国家的数据放开数据量限制,有多少数据取多少数据,也不用和以往数据去重 dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "ID").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(gaidoutput) dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "TH").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(th_gaidoutput) dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "VN").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(vn_gaidoutput) dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "PH").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(ph_gaidoutput) dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "MY").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(my_gaidoutput) dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "SG").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(sg_gaidoutput) } finally { spark.stop() } 0 } } object EtlLazadaActivitionDaily { def main(args: Array[String]): Unit = { new EtlLazadaActivitionDaily().run(args) } } //package mobvista.dmp.datasource.taobao // //import java.net.URI // //import mobvista.dmp.common.CommonSparkJob //import mobvista.dmp.format.TextMultipleOutputFormat //import org.apache.commons.cli.Options //import org.apache.hadoop.fs.{FileSystem, Path} //import org.apache.hadoop.io.Text //import org.apache.spark.sql.{Row, SparkSession} // //import scala.collection.mutable.ArrayBuffer // // //class EtlLazadaActivitionDaily extends CommonSparkJob { // override protected def buildOptions(): Options = { // val options = new Options // options.addOption("gaidoutput", true, "[must] gaidoutput") // options.addOption("gaidinput", true, "[must] gaidinput") // options.addOption("newoutput", true, "[must] newoutput") // options.addOption("today", true, "[must] today") // options.addOption("dt_30days_ago", true, "[must] dt_30days_ago") // options // } // // // // override protected def run(args: Array[String]): Int = { // val commandLine = commParser.parse(options, args) // if (!checkMustOption(commandLine)) { // printUsage(options) // return -1 // } else printOptions(commandLine) // // val gaidoutput = commandLine.getOptionValue("gaidoutput") // val gaidinput = commandLine.getOptionValue("gaidinput") // val newoutput = commandLine.getOptionValue("newoutput") // val today = commandLine.getOptionValue("today") // val dt_30days_ago = commandLine.getOptionValue("dt_30days_ago") // // // // val spark = SparkSession.builder() // .appName("EtlLazadaActivitionDaily") // .config("spark.rdd.compress", "true") // .config("spark.io.compression.codec", "snappy") // .config("spark.sql.orc.filterPushdown", "true") // .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") // .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // .enableHiveSupport() // .getOrCreate() // // import spark.implicits._ // // FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(gaidoutput), true) // FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(newoutput), true) // // try { // val sql1= // s""" // |select distinct(dev_id) // |from dwh.ods_dmp_user_info_daily where dt between '${dt_30days_ago}' and '${today}' // | and dev_type='gaid' // | and platform='android' // | and upper(country) = 'ID' // """.stripMargin // // println("sql1==="+sql1) // // spark.sql(sql1).rdd.map(_.mkString).coalesce(50) // .saveAsTextFile(gaidoutput) // // spark.sql(sql1).rdd.map(_.mkString).subtract( // spark.sparkContext.textFile(gaidinput)).toDF.limit(50000000).rdd.map(_.mkString).coalesce(50) // .saveAsTextFile(newoutput) // // } finally { // spark.stop() // } // 0 // // } //} // //object EtlLazadaActivitionDaily { // def main(args: Array[String]): Unit = { // new EtlLazadaActivitionDaily().run(args) // } //}