AlipayLaHuoDaily.scala 6.16 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
package mobvista.dmp.datasource.taobao

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.TextMultipleOutputFormat
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class AlipayLaHuoDaily extends CommonSparkJob with Serializable  {


  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("imeioutput", true, "[must] imeioutput")
    options.addOption("today", true, "[must] today")
    options.addOption("last_req_day", true, "[must] last_req_day")
    options.addOption("dt_after_one_day", true, "[must] dt_after_one_day")
    options.addOption("input_one_day", true, "[must] input_one_day")
    options.addOption("input_two_day", true, "[must] input_two_day")
    options.addOption("input_three_day", true, "[must] input_three_day")
    options.addOption("input_four_day", true, "[must] input_four_day")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val imeioutput = commandLine.getOptionValue("imeioutput")
    val today = commandLine.getOptionValue("today")
    val last_req_day = commandLine.getOptionValue("last_req_day")
    val dt_after_one_day = commandLine.getOptionValue("dt_after_one_day")
    val input_one_day = commandLine.getOptionValue("input_one_day")
    val input_two_day = commandLine.getOptionValue("input_two_day")
    val input_three_day = commandLine.getOptionValue("input_three_day")
    val input_four_day = commandLine.getOptionValue("input_four_day")


    val spark = SparkSession.builder()
      .appName("AlipayLaHuoDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(imeioutput), true)
    import spark.implicits._

    try {

      val sql1=
        s"""
           |select XX.device_id_md5
           |FROM (select X.device_id_md5,X.device_id,X.device_type,
           |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk
           |from ( select device_id,device_type,
           |case when device_type = 'imei' then MD5(device_id) when device_type = 'imeimd5' then device_id end as device_id_md5
           |from dwh.ods_dmp_user_info where dt ='${today}'
           | and device_type in ('imei','imeimd5')
           | and last_req_day >='${last_req_day}'
           | and  upper(country) = 'CN' ) X ) XX
           | WHERE XX.rk= 1
        """.stripMargin

//      val sql2=
//        s"""
//          |select distinct device_id from dwh.etl_alipay_activation_daily where dt='${dt_after_one_day}'  and business in ('alipay_activation') and package_name in ('com.alipay.foractivation')
//          |""".stripMargin
//
//      val yesterday_success_data = spark.sql(sql2).repartition(500).rdd.map(_.mkString)
//      val num_yesterday_success_data: Int = yesterday_success_data.count().toInt
//
//      val oldData = spark.sparkContext.textFile(input_one_day).flatMap(_.split(",")).subtract(yesterday_success_data)

//      val subtractRdd: RDD[String] = spark.sql(sql1).repartition(500).rdd.map(_.mkString).subtract(yesterday_success_data).subtract(oldData)

      val subtractRdd: RDD[String] = spark.sql(sql1).repartition(500).rdd.map(_.mkString)
      var resultRdd: RDD[String] = spark.sparkContext.emptyRDD[String]
      val lahuo_data_size =300000000
      resultRdd=subtractRdd.toDF.limit(lahuo_data_size).rdd.map(_.mkString)

//      val num: Int = subtractRdd.count().toInt
//      val lahuo_data_size =300000000 - num_yesterday_success_data
//      if(num>=lahuo_data_size){
//        println("number is enough, number is "+num)
//        resultRdd=subtractRdd.toDF.limit(lahuo_data_size).rdd.map(_.mkString)
//      }else{
//        println("number is not enough,after deduplication number is "+num)
//        resultRdd=subtractRdd
//          .union(oldData.distinct().toDF.limit(lahuo_data_size-num).rdd.map(_.mkString))
//      }
//      resultRdd=resultRdd.union(yesterday_success_data)
//      println("resultRdd.getNumPartitions=========="+resultRdd.getNumPartitions)

      resultRdd.repartition(500).mapPartitions(rs => {
        val array = new ArrayBuffer[String]()
        var devidSet = new mutable.HashSet[String]()
        while (rs.hasNext) {
          devidSet.add(rs.next())
          if (devidSet.size == 5) {
            array += devidSet.mkString(",")
            devidSet = new mutable.HashSet[String]()
          } else if (devidSet.size < 5 && !rs.hasNext) {
            array += devidSet.mkString(",")
          }
        }
        array.iterator
      }).map(device_ids => Tuple2(new Text( "0"+ (1+scala.util.Random.nextInt(4)).toString+", "), new Text(device_ids))).saveAsNewAPIHadoopFile(s"${imeioutput}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)

//      yesterday_success_data.repartition(500).mapPartitions(rs => {
//        val array = new ArrayBuffer[String]()
//        var devidSet = new mutable.HashSet[String]()
//        while (rs.hasNext) {
//          devidSet.add(rs.next())
//          if (devidSet.size == 5) {
//            array += devidSet.mkString(",")
//            devidSet = new mutable.HashSet[String]()
//          } else if (devidSet.size < 5 && !rs.hasNext) {
//            array += devidSet.mkString(",")
//          }
//        }
//        array.iterator
//      }).saveAsTextFile(imeioutput+"/01")

    } finally {
      spark.stop()
    }
    0
  }
}

object AlipayLaHuoDaily {
  def main(args: Array[String]): Unit = {
    new AlipayLaHuoDaily().run(args)
  }
}