DspReqImeiDealDaily.scala 3.72 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
package mobvista.dmp.datasource.dsp

import java.net.URI

import mobvista.dmp.common.CommonSparkJob

import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.{Row, SaveMode, SparkSession}


class DspReqImeiDealDaily extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("devmd5day", true, "[must] devmd5day")
    options
  }

  /**
    *  ../${JAR}  - "$OUTPUT_PATH" -coalesce 40 \
 -devmd5day ${last_sunday}

    * @param args
    * @return
    */

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)


    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val devmd5day = commandLine.getOptionValue("devmd5day")




    val spark = SparkSession.builder()
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    try {



      // device_id  匹配上取t2表的明文否则取md5
      // device_type 匹配不上为imeimd5,否则为imei
     /* val sql1=
        s"""
           |select
           |coalesce(t2.device_id,t1.device_id) device_id,
           |case when t2.device_id_md5 is null then 'imeimd5'  else 'imei' end as device_type,
           |t1.platform,
           |t1.country_code,
           |t1.ip,
           |t1.gender,
           |t1.birthday,
           |t1.maker,
           |t1.model,
           |t1.os_version,
           |t1.package_list,
           |t1.androidids,
           |t1.datetime,
           |t1.segment_ids
           |from
           |(select * from dwh.etl_dsp_request_daily_tmp where device_type = 'imeimd5' ) t1
           |left join (select * from dwh.device_id_md5_match where dt='${devmd5day}' and device_type = 'imei') t2
           |on (t1.device_id = t2.device_id_md5)
           |union
           |select * from dwh.etl_dsp_request_daily_tmp where device_type != 'imeimd5'
      """.stripMargin*/

      val sql1=
        s"""
           |select
           |t2.device_id,
           |'imei' device_type,
           |t1.platform,
           |t1.country_code,
           |t1.ip,
           |t1.gender,
           |t1.birthday,
           |t1.maker,
           |t1.model,
           |t1.os_version,
           |t1.package_list,
           |t1.androidids,
           |t1.datetime,
           |t1.segment_ids,
           |t1.region
           |from
           |(select * from dwh.etl_dsp_request_daily_tmp where device_type = 'imeimd5' ) t1
           |join (select * from dwh.device_id_md5_match where dt='${devmd5day}' and device_type = 'imei') t2
           |on (t1.device_id = t2.device_id_md5)
           |union
           |select * from dwh.etl_dsp_request_daily_tmp
      """.stripMargin
      spark.sql(sql1).rdd.map(_.mkString("\t")).repartition(coalesce.toInt)
        .saveAsTextFile(output, classOf[GzipCodec])


    } finally {
      spark.stop()
    }
    0
  }
}

object DspReqImeiDealDaily {
  def main(args: Array[String]): Unit = {
    new DspReqImeiDealDaily().run(args)
  }
}