package mobvista.dmp.datasource.dsp

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._

/**
  * @package: mobvista.dmp.datasource.dsp
  * @author: wangjf
  * @date: 2020/5/27
  * @time: 11:26 上午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class DspDeviceProfile extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]) = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      printOptions(commandLine)
    } else {
      printOptions(commandLine)
    }
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession("DspDeviceProfile")

    try {
      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      val last_date = DateUtil.getDay(mobvista.dmp.common.MobvistaConstant.sdf2.parse(date), "yyyyMMdd", -1)
      val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -365), "yyyy-MM-dd")

      val dt = mobvista.dmp.common.MobvistaConstant.sdf1.format(mobvista.dmp.common.MobvistaConstant.sdf2.parse(date))
      val new_df = spark.sql(new_sql.replace("@date", dt))
        .groupBy(col("device_id"), col("device_type")).agg(
        max(col("platform")).alias("platform"),
        max(col("maker")).alias("maker"),
        max(col("model")).alias("model"),
        max(col("os_version")).alias("os_version"),
        when(max(col("country_code")) === lit("GB"), lit("UK"))
          .otherwise(max(col("country_code"))).alias("country"),
        max(col("birthday")).alias("birthday"),
        max(col("gender")).alias("gender"),
        max(col("datetime").substr(0, 10)).alias("dmp_time")
      )

      val old_df = spark.sql(old_sql.replace("@date", last_date).replace("@update_date", update_date))

      new_df.join(old_df, new_df("device_id") === old_df("device_id") && new_df("device_type") === old_df("device_type"), "fullouter")
        .select(
          coalesce(new_df("device_id"), old_df("device_id")).alias("device_id"),
          coalesce(new_df("device_type"), old_df("device_type")).alias("device_type"),
          coalesce(new_df("platform"), old_df("platform")).alias("platform"),
          coalesce(new_df("maker"), old_df("maker")).alias("maker"),
          coalesce(new_df("model"), old_df("model")).alias("model"),
          coalesce(new_df("os_version"), old_df("os_version")).alias("os_version"),
          coalesce(new_df("country"), old_df("country")).alias("country"),
          coalesce(new_df("birthday"), old_df("birthday")).alias("birthday"),
          coalesce(new_df("gender"), old_df("gender")).alias("gender"),
          coalesce(new_df("dmp_time"), old_df("dmp_time")).alias("dmp_time")
        ).repartition(2000)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("output", true, "[must] output")
    options
  }

  val old_sql =
    """
      |SELECT * FROM dwh.dsp_profile_total
      |   WHERE dt = '@date' AND dmp_time >= '@update_date'
      |""".stripMargin

  val new_sql =
    """
      |SELECT * FROM dwh.etl_dsp_request_daily
      |   WHERE `date` = '@date'
      |""".stripMargin
}

object DspDeviceProfile {
  def main(args: Array[String]): Unit = {
    new DspDeviceProfile().run(args)
  }
}