UCOtherDataToDmpV2.scala 4.77 KB
Newer Older
wang-jinfeng committed
1 2 3
package mobvista.dmp.datasource.taobao

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
wang-jinfeng committed
4 5
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.MRUtils
wang-jinfeng committed
6 7 8
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
wang-jinfeng committed
9
import org.apache.hadoop.io.Text
wang-jinfeng committed
10
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
wang-jinfeng committed
11
import org.apache.spark.storage.StorageLevel
wang-jinfeng committed
12 13

import java.net.URI
wang-jinfeng committed
14 15
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
wang-jinfeng committed
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41


class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("dt_today", true, "[must] dt_today")
    options.addOption("dt_oneday_ago", true, "[must] dt_oneday_ago")
    options.addOption("update", true, "[must] update")
    options.addOption("output", true, "[must] output")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val dt_today = commandLine.getOptionValue("dt_today")
    val dt_oneday_ago = commandLine.getOptionValue("dt_oneday_ago")
    val update = commandLine.getOptionValue("update")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession("UCOtherDataToDmp")

wang-jinfeng committed
42
    mutable.WrappedArray
wang-jinfeng committed
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

    try {
      val conf = spark.sparkContext.hadoopConfiguration
      conf.set("mapreduce.output.compress", "true")
      conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
      conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])

      val sql =
        s"""
           |SELECT device_id, COLLECT_SET(package_name) install_list
           | FROM
           |  (
           |  SELECT device_id, package_name
           |    FROM dwh.dm_install_list_v2
           |    WHERE dt = '${dt_today}' AND business = 'uc_activation' AND device_type = 'imeimd5'
           |    AND package_name IN ('com.uc.foractivation.4b5a58','com.uc.foractivation.d3f521')
           |  UNION
           |  SELECT device_id, package_name
           |    FROM dwh.dm_install_list_v2
           |    WHERE dt = '${dt_oneday_ago}' AND business = 'dsp_req' AND device_type = 'imeimd5'
           |    AND package_name IN ('com.UCMobile_bes','com.ucmobile_oppo')
           |  ) t
           | GROUP BY device_id
           |""".stripMargin

      val df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER)

      val rdd = df.rdd.map(r => {
wang-jinfeng committed
76
        val arrayBuffer = new ArrayBuffer[(Text, Text)]()
wang-jinfeng committed
77 78 79 80 81
        val deviceId = r.getAs[String]("device_id")
        val deviceType = "imeimd5"
        val platform = "android"
        val installList = r.getAs[mutable.WrappedArray[String]]("install_list")
        if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.UCMobile_bes")) {
wang-jinfeng committed
82
          arrayBuffer += ((new Text(s"$output/4b5a58_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucbes", update))))
wang-jinfeng committed
83 84
        }
        if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.UCMobile_bes")) {
wang-jinfeng committed
85
          arrayBuffer += ((new Text(s"$output/d3f521_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucbes", update))))
wang-jinfeng committed
86 87
        }
        if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.ucmobile_oppo")) {
wang-jinfeng committed
88
          arrayBuffer += ((new Text(s"$output/4b5a58_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucoppo", update))))
wang-jinfeng committed
89 90
        }
        if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.ucmobile_oppo")) {
wang-jinfeng committed
91
          arrayBuffer += ((new Text(s"$output/d3f521_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucoppo", update))))
wang-jinfeng committed
92
        }
wang-jinfeng committed
93 94 95
        arrayBuffer
      }).flatMap(l => {
        l
wang-jinfeng committed
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
      })

      rdd.coalesce(50)
        .saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)

    } finally {
      spark.stop()
    }
    0
  }
}

object UCOtherDataToDmpV2 {
  def main(args: Array[String]): Unit = {
    new UCOtherDataToDmpV2().run(args)
  }
}