package mobvista.dmp.datasource.toutiao

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel

/**
  * @package: mobvista.dmp.datasource.toutiao
  * @author: wangjf
  * @create: 2018-12-05 15:35:44
  **/
class GameDeviceToutiao extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val tag_output = commandLine.getOptionValue("tag_output")
    val startDate = commandLine.getOptionValue("startDate")
    val endDate = commandLine.getOptionValue("endDate")
    val file = commandLine.getOptionValue("file")

    val spark = SparkSession.builder()
      .appName("GameDeviceToutiao")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(tag_output), true)

    try {
      val tagRdd = spark.sparkContext.textFile(file).map(_.split(",")).map(r => {
        Row(r(1) + "-" + r(2) + "-" + r(3), r(0), r(8))
      })

      spark.createDataFrame(tagRdd, Constant.schema).createOrReplaceTempView("toutiao")

      val cn_sql =
        s"""
           |SELECT MD5(UPPER(device_id)) device_id
           |FROM dwh.ods_dmp_user_info
           |WHERE dt = '${date}' AND platform = 'ios' AND device_type = 'idfa' AND country = 'CN' AND last_req_day BETWEEN '${startDate}' AND '${endDate}'
        """.stripMargin

      val cn_df = spark.sql(cn_sql)

      val tag_sql =
        s"""
           |SELECT /*+ mapjoin(t)*/ MD5(UPPER(m.device_id)) device_id,t.tag_type tag_type,t.tag_id tag_id,
           | CASE WHEN m.cnt >= 4 THEN '01' WHEN m.cnt >= 2 THEN '02' ELSE '03' END AS active_id
           | FROM toutiao t JOIN
           | (SELECT device_id,tag_type,first_tag,second_tag,cnt FROM dwh.dm_device_tag_statistics WHERE dt = '${date}') m
           | ON CONCAT(m.tag_type,"-",m.first_tag,"-",m.second_tag) = t.tag_code
        """.stripMargin

      val tag_df = spark.sql(tag_sql)

      cn_df.createOrReplaceTempView("cn_device")

      tag_df.createOrReplaceTempView("tag_device")

      var sql =
        """
          |SELECT tag.device_id device_id,tag_type,tag_id,active_id
          | FROM tag_device tag,cn_device cn
          | WHERE tag.device_id = cn.device_id
        """.stripMargin

      val base_df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER)

      base_df.createOrReplaceTempView("base_device")

      sql =
        """
          |SELECT tag_type,tag_id,COUNT(DISTINCT device_id)
          |FROM base_device
          |GROUP BY tag_type,tag_id
          |ORDER BY tag_type,tag_id
        """.stripMargin

      spark.sql(sql).repartition(1).rdd.saveAsTextFile(tag_output)

      val categoryDF = base_df.where(conditionExpr = "tag_type = '01'")

      val styleDF = base_df.where(conditionExpr = "tag_type = '02'")

      val ruleDF = base_df.where(conditionExpr = "tag_type = '03'")

      val all_rdd = categoryDF.union(styleDF).union(ruleDF)
        .select(col = "device_id", cols = "tag_type", "tag_id", "active_id")
        .rdd.mapPartitions(Constant.mapPart).combineByKey(
        (v: (String, String)) => Iterable(v),
        (c: Iterable[(String, String)], v: (String, String)) => c ++ Seq(v),
        (c1: Iterable[(String, String)], c2: Iterable[(String, String)]) => c1 ++ c2
      )

      all_rdd.mapPartitions(Constant.mapPartCore).combineByKey(
        (v: String) => Iterable(v),
        (c: Iterable[String], v: String) => c ++ Seq(v),
        (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2
      ).mapPartitions(Constant.mapProtoPart)
        .saveAsTextFile(output)
      //  不压缩,因头条仅支持 zip 格式上传解析

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] today date")
    options.addOption("startDate", true, "[must] startDate ")
    options.addOption("endDate", true, "[must] endDate ")
    options.addOption("output", true, "[must] output path ")
    options.addOption("tag_output", true, "[must] tag_output path ")
    options.addOption("file", true, "[must] file")
    options
  }
}

object GameDeviceToutiao {
  def main(args: Array[String]): Unit = {
    new GameDeviceToutiao().run(args)
  }
}