package mobvista.dmp.datasource.device

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions

import java.net.URI
import java.text.SimpleDateFormat

class OdsDmpUserInfoDailyMapping extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val input = commandLine.getOptionValue("input")
    val coalesce = commandLine.getOptionValue("coalesce")
    val output = commandLine.getOptionValue("output")


    val sdf1 = new SimpleDateFormat("yyyy-MM-dd");
    val sdf2 = new SimpleDateFormat("yyyyMMdd");

    val spark = MobvistaConstant.createSparkSession(s"OdsDmpUserInfoDailyMapping.${date}")

    try {
      val sc = spark.sparkContext

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      val mappingDF = spark.read.orc(input)

      val sql =
        s"""
           |SELECT * FROM dwh.ods_dmp_user_info_daily
           | WHERE dt = '${date}' AND UPPER(country) = 'CN'
           |""".stripMargin

      val userInfoDailyDF = spark.sql(sql)

      mappingDF.join(userInfoDailyDF, mappingDF("lower_device_id") === userInfoDailyDF("dev_id"), "right")
        .select(
          functions.coalesce(mappingDF("device_id"), userInfoDailyDF("dev_id")).alias("dev_id"),
          functions.md5(functions.coalesce(mappingDF("device_id"), userInfoDailyDF("dev_id"))).alias("dev_id_md5"),
          userInfoDailyDF("dev_type"),
          userInfoDailyDF("platform"),
          userInfoDailyDF("install"),
          userInfoDailyDF("interest"),
          userInfoDailyDF("model"),
          userInfoDailyDF("country"),
          userInfoDailyDF("osversion"),
          userInfoDailyDF("age"),
          userInfoDailyDF("gender"),
          userInfoDailyDF("behavior")
        ).repartition(coalesce.toInt)
        .write
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }


  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("input", true, "input")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("output", true, "output")
    options
  }
}

object OdsDmpUserInfoDailyMapping {
  def main(args: Array[String]): Unit = {
    new OdsDmpUserInfoDailyMapping().run(args)
  }
}