GameDeviceToutiao.scala 5.1 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
package mobvista.dmp.datasource.toutiao

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel

/**
  * @package: mobvista.dmp.datasource.toutiao
  * @author: wangjf
  * @create: 2018-12-05 15:35:44
  **/
class GameDeviceToutiao extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val tag_output = commandLine.getOptionValue("tag_output")
    val startDate = commandLine.getOptionValue("startDate")
    val endDate = commandLine.getOptionValue("endDate")
    val file = commandLine.getOptionValue("file")

    val spark = SparkSession.builder()
      .appName("GameDeviceToutiao")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(tag_output), true)

    try {
      val tagRdd = spark.sparkContext.textFile(file).map(_.split(",")).map(r => {
        Row(r(1) + "-" + r(2) + "-" + r(3), r(0), r(8))
      })

      spark.createDataFrame(tagRdd, Constant.schema).createOrReplaceTempView("toutiao")

      val cn_sql =
        s"""
           |SELECT MD5(UPPER(device_id)) device_id
           |FROM dwh.ods_dmp_user_info
           |WHERE dt = '${date}' AND platform = 'ios' AND device_type = 'idfa' AND country = 'CN' AND last_req_day BETWEEN '${startDate}' AND '${endDate}'
        """.stripMargin

      val cn_df = spark.sql(cn_sql)

      val tag_sql =
        s"""
           |SELECT /*+ mapjoin(t)*/ MD5(UPPER(m.device_id)) device_id,t.tag_type tag_type,t.tag_id tag_id,
           | CASE WHEN m.cnt >= 4 THEN '01' WHEN m.cnt >= 2 THEN '02' ELSE '03' END AS active_id
           | FROM toutiao t JOIN
           | (SELECT device_id,tag_type,first_tag,second_tag,cnt FROM dwh.dm_device_tag_statistics WHERE dt = '${date}') m
           | ON CONCAT(m.tag_type,"-",m.first_tag,"-",m.second_tag) = t.tag_code
        """.stripMargin

      val tag_df = spark.sql(tag_sql)

      cn_df.createOrReplaceTempView("cn_device")

      tag_df.createOrReplaceTempView("tag_device")

      var sql =
        """
          |SELECT tag.device_id device_id,tag_type,tag_id,active_id
          | FROM tag_device tag,cn_device cn
          | WHERE tag.device_id = cn.device_id
        """.stripMargin

      val base_df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER)

      base_df.createOrReplaceTempView("base_device")

      sql =
        """
          |SELECT tag_type,tag_id,COUNT(DISTINCT device_id)
          |FROM base_device
          |GROUP BY tag_type,tag_id
          |ORDER BY tag_type,tag_id
        """.stripMargin

      spark.sql(sql).repartition(1).rdd.saveAsTextFile(tag_output)

      val categoryDF = base_df.where(conditionExpr = "tag_type = '01'")

      val styleDF = base_df.where(conditionExpr = "tag_type = '02'")

      val ruleDF = base_df.where(conditionExpr = "tag_type = '03'")

      val all_rdd = categoryDF.union(styleDF).union(ruleDF)
        .select(col = "device_id", cols = "tag_type", "tag_id", "active_id")
        .rdd.mapPartitions(Constant.mapPart).combineByKey(
        (v: (String, String)) => Iterable(v),
        (c: Iterable[(String, String)], v: (String, String)) => c ++ Seq(v),
        (c1: Iterable[(String, String)], c2: Iterable[(String, String)]) => c1 ++ c2
      )

      all_rdd.mapPartitions(Constant.mapPartCore).combineByKey(
        (v: String) => Iterable(v),
        (c: Iterable[String], v: String) => c ++ Seq(v),
        (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2
      ).mapPartitions(Constant.mapProtoPart)
        .saveAsTextFile(output)
      //  不压缩,因头条仅支持 zip 格式上传解析

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] today date")
    options.addOption("startDate", true, "[must] startDate ")
    options.addOption("endDate", true, "[must] endDate ")
    options.addOption("output", true, "[must] output path ")
    options.addOption("tag_output", true, "[must] tag_output path ")
    options.addOption("file", true, "[must] file")
    options
  }
}

object GameDeviceToutiao {
  def main(args: Array[String]): Unit = {
    new GameDeviceToutiao().run(args)
  }
}