package mobvista.dmp.datasource.taobao import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class YOUKULaXinDaily extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("imeioutput", true, "[must] imeioutput") options.addOption("oaidoutput", true, "[must] oaidoutput") options.addOption("today", true, "[must] today") options.addOption("last_req_day", true, "[must] last_req_day") options.addOption("input_one_day", true, "[must] input_one_day") options.addOption("input_two_day", true, "[must] input_two_day") options.addOption("input_three_day", true, "[must] input_three_day") options.addOption("oaid_input_one_day", true, "[must] oaid_input_one_day") options.addOption("oaid_input_two_day", true, "[must] oaid_input_two_day") options.addOption("oaid_input_three_day", true, "[must] oaid_input_three_day") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val imeioutput = commandLine.getOptionValue("imeioutput") val oaidoutput = commandLine.getOptionValue("oaidoutput") val today = commandLine.getOptionValue("today") val last_req_day = commandLine.getOptionValue("last_req_day") val input_one_day = commandLine.getOptionValue("input_one_day") val input_two_day = commandLine.getOptionValue("input_two_day") val input_three_day = commandLine.getOptionValue("input_three_day") val oaid_input_one_day = commandLine.getOptionValue("oaid_input_one_day") val oaid_input_two_day = commandLine.getOptionValue("oaid_input_two_day") val oaid_input_three_day = commandLine.getOptionValue("oaid_input_three_day") val spark = SparkSession.builder() .appName("YOUKULaXinDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(imeioutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(oaidoutput), true) import spark.implicits._ try { var lahuo_data_size =25000000 val sql1= s""" |select XX.device_id_md5 |FROM (select X.device_id_md5,X.device_id,X.device_type, |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk |from ( select device_id,device_type, |case when device_type = 'imei' then MD5(device_id) when device_type = 'imeimd5' then device_id end as device_id_md5 |from dwh.ods_dmp_user_info where dt ='${today}' | and device_type in ('imei','imeimd5') | and last_req_day >='${last_req_day}' | and upper(country) = 'CN' ) X ) XX | WHERE XX.rk= 1 """.stripMargin var oldData = spark.sparkContext.textFile(input_one_day).flatMap(_.split(",")).union(spark.sparkContext.textFile(input_two_day).flatMap(_.split(","))).union(spark.sparkContext.textFile(input_three_day).flatMap(_.split(","))) var subtractRdd: RDD[String] = spark.sql(sql1).repartition(500).rdd.map(_.mkString).subtract(oldData) var resultRdd: RDD[String] = spark.sparkContext.emptyRDD[String] var num: Int = subtractRdd.count().toInt if(num>=lahuo_data_size){ println("number is enough, number is "+num) resultRdd=subtractRdd.toDF.limit(lahuo_data_size).rdd.map(_.mkString) }else{ println("number is not enough,after deduplication number is "+num) resultRdd=subtractRdd .union(oldData.distinct().toDF.limit(lahuo_data_size-num).rdd.map(_.mkString)) } println("resultRdd.getNumPartitions=========="+resultRdd.getNumPartitions) resultRdd.repartition(500).saveAsTextFile(imeioutput) val sql2= s""" |select XX.device_id_md5 |FROM (select X.device_id_md5,X.device_id,X.device_type, |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk |from ( select device_id,device_type, |case when device_type = 'oaid' then MD5(device_id) when device_type = 'oaidmd5' then device_id end as device_id_md5 |from dwh.ods_dmp_user_info where dt ='${today}' | and device_type in ('oaid','oaidmd5') | and last_req_day >='${last_req_day}' | and upper(country) = 'CN' ) X ) XX | WHERE XX.rk= 1 """.stripMargin oldData = spark.sparkContext.textFile(oaid_input_one_day).flatMap(_.split(",")).union(spark.sparkContext.textFile(oaid_input_two_day).flatMap(_.split(","))).union(spark.sparkContext.textFile(oaid_input_three_day).flatMap(_.split(","))) subtractRdd = spark.sql(sql2).repartition(500).rdd.map(_.mkString).subtract(oldData) resultRdd = spark.sparkContext.emptyRDD[String] num = subtractRdd.count().toInt lahuo_data_size =25000000 if(num>=lahuo_data_size){ println("number is enough, number is "+num) resultRdd=subtractRdd.toDF.limit(lahuo_data_size).rdd.map(_.mkString) }else{ println("number is not enough,after deduplication number is "+num) resultRdd=subtractRdd .union(oldData.distinct().toDF.limit(lahuo_data_size-num).rdd.map(_.mkString)) } println("resultRdd.getNumPartitions=========="+resultRdd.getNumPartitions) resultRdd.repartition(500).saveAsTextFile(oaidoutput) // spark.sql(sql1).limit(lahuo_data_size).repartition(500).rdd.map(_.mkString).saveAsTextFile(imeioutput) // spark.sql(sql2).limit(lahuo_data_size).repartition(500).rdd.map(_.mkString).saveAsTextFile(oaidoutput) } finally { spark.stop() } 0 } } object YOUKULaXinDaily { def main(args: Array[String]): Unit = { new YOUKULaXinDaily().run(args) } }