package mobvista.dmp.datasource.toutiao

import mobvista.dmp.common.CommonSparkJob
import org.apache.spark.sql.SparkSession

class EtlToutiaoDaily extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return 1
    } else {
      printOptions(commandLine)
    }

    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")

    val spark = SparkSession.builder()
      .appName("dmp_DmToutiaoTotal_fengliang")
      .config("spark.rdd.compress", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    import spark.implicits._
    val sc = spark.sparkContext

    try {
      sc.textFile(input)
        .map(splitFun(_))
        .filter(_.length >= 6)
        .map(array => {
          val first = array(0)
          var campaignId = array(1)
          if (first.contains(" ")) {
            val splits = splitFun(first, " ")
            if (splits.length == 3) {
              campaignId = splits(2)
            }
          }
          val deviceId = array(5)
          ToutiaoDailyVO(deviceId, campaignId)
        })
        .filter(vo => {
          val deviceId = vo.device_id
          deviceId.matches(didPtn) && !deviceId.equals(allZero)
        })
        .toDF()
        .createOrReplaceTempView("t_toutiao_daily")

      val sql =
        """
          |select t.device_id, t.campaign_id
          |from t_toutiao_daily t
          |group by t.device_id, t.campaign_id
        """.stripMargin
      spark.sql(sql)
        .write
        .option("orc.compress", "zlib")
        .orc(output)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object EtlToutiaoDaily {
  def main(args: Array[String]): Unit = {
    new EtlToutiaoDaily().run(args)
  }
}

case class ToutiaoDailyVO(device_id: String, campaign_id: String)