package mobvista.dmp.datasource.taobao import java.net.URI import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.format.TextMultipleOutputFormat import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.Text import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class AlipayLaHuoDaily extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("imeioutput", true, "[must] imeioutput") options.addOption("today", true, "[must] today") options.addOption("last_req_day", true, "[must] last_req_day") options.addOption("dt_after_one_day", true, "[must] dt_after_one_day") options.addOption("input_one_day", true, "[must] input_one_day") options.addOption("input_two_day", true, "[must] input_two_day") options.addOption("input_three_day", true, "[must] input_three_day") options.addOption("input_four_day", true, "[must] input_four_day") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val imeioutput = commandLine.getOptionValue("imeioutput") val today = commandLine.getOptionValue("today") val last_req_day = commandLine.getOptionValue("last_req_day") val dt_after_one_day = commandLine.getOptionValue("dt_after_one_day") val input_one_day = commandLine.getOptionValue("input_one_day") val input_two_day = commandLine.getOptionValue("input_two_day") val input_three_day = commandLine.getOptionValue("input_three_day") val input_four_day = commandLine.getOptionValue("input_four_day") val spark = SparkSession.builder() .appName("AlipayLaHuoDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(imeioutput), true) import spark.implicits._ try { val sql1= s""" |select XX.device_id_md5 |FROM (select X.device_id_md5,X.device_id,X.device_type, |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk |from ( select device_id,device_type, |case when device_type = 'imei' then MD5(device_id) when device_type = 'imeimd5' then device_id end as device_id_md5 |from dwh.ods_dmp_user_info where dt ='${today}' | and device_type in ('imei','imeimd5') | and last_req_day >='${last_req_day}' | and upper(country) = 'CN' ) X ) XX | WHERE XX.rk= 1 """.stripMargin // val sql2= // s""" // |select distinct device_id from dwh.etl_alipay_activation_daily where dt='${dt_after_one_day}' and business in ('alipay_activation') and package_name in ('com.alipay.foractivation') // |""".stripMargin // // val yesterday_success_data = spark.sql(sql2).repartition(500).rdd.map(_.mkString) // val num_yesterday_success_data: Int = yesterday_success_data.count().toInt // // val oldData = spark.sparkContext.textFile(input_one_day).flatMap(_.split(",")).subtract(yesterday_success_data) // val subtractRdd: RDD[String] = spark.sql(sql1).repartition(500).rdd.map(_.mkString).subtract(yesterday_success_data).subtract(oldData) val subtractRdd: RDD[String] = spark.sql(sql1).repartition(500).rdd.map(_.mkString) var resultRdd: RDD[String] = spark.sparkContext.emptyRDD[String] val lahuo_data_size =300000000 resultRdd=subtractRdd.toDF.limit(lahuo_data_size).rdd.map(_.mkString) // val num: Int = subtractRdd.count().toInt // val lahuo_data_size =300000000 - num_yesterday_success_data // if(num>=lahuo_data_size){ // println("number is enough, number is "+num) // resultRdd=subtractRdd.toDF.limit(lahuo_data_size).rdd.map(_.mkString) // }else{ // println("number is not enough,after deduplication number is "+num) // resultRdd=subtractRdd // .union(oldData.distinct().toDF.limit(lahuo_data_size-num).rdd.map(_.mkString)) // } // resultRdd=resultRdd.union(yesterday_success_data) // println("resultRdd.getNumPartitions=========="+resultRdd.getNumPartitions) resultRdd.repartition(500).mapPartitions(rs => { val array = new ArrayBuffer[String]() var devidSet = new mutable.HashSet[String]() while (rs.hasNext) { devidSet.add(rs.next()) if (devidSet.size == 5) { array += devidSet.mkString(",") devidSet = new mutable.HashSet[String]() } else if (devidSet.size < 5 && !rs.hasNext) { array += devidSet.mkString(",") } } array.iterator }).map(device_ids => Tuple2(new Text( "0"+ (1+scala.util.Random.nextInt(4)).toString+", "), new Text(device_ids))).saveAsNewAPIHadoopFile(s"${imeioutput}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) // yesterday_success_data.repartition(500).mapPartitions(rs => { // val array = new ArrayBuffer[String]() // var devidSet = new mutable.HashSet[String]() // while (rs.hasNext) { // devidSet.add(rs.next()) // if (devidSet.size == 5) { // array += devidSet.mkString(",") // devidSet = new mutable.HashSet[String]() // } else if (devidSet.size < 5 && !rs.hasNext) { // array += devidSet.mkString(",") // } // } // array.iterator // }).saveAsTextFile(imeioutput+"/01") } finally { spark.stop() } 0 } } object AlipayLaHuoDaily { def main(args: Array[String]): Unit = { new AlipayLaHuoDaily().run(args) } }