package mobvista.dmp.datasource.device

import java.net.URI
import java.text.SimpleDateFormat

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

class OdsDmpUserInfoAll extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val scheduleTime = commandLine.getOptionValue("cur_day")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
    val sdf2 = new SimpleDateFormat("yyyyMMdd")

    val spark = SparkSession
      .config("spark.rdd.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    try {
      val yesBef1Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -1)
      val yesBef1Str = sdf2.format(sdf1.parse(yesBef1Part))

      val yesBef2Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -2)
      val yesBef2DayStr = sdf2.format(sdf1.parse(yesBef2Part))

      val befYearPart = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -365)

      def schema: StructType = {
        StructType(StructField("device_id", StringType) ::

      val publishDF = spark.createDataFrame(sc.textFile(input).map(r => {
      }), schema)

      var hql =
           |select lower(dev_id) dev_id,max(dev_id_md5) as dev_id_md5,
           |max(dev_type) dev_type,
           |max(platform) platform,
           |max(install) as install,
           |max(interest) as interest,
           |max(model) as model,
           |max(country) as country,
           |max(osversion) as osversion,
           |min(age) as age,
           |min(gender) as gender,
           |max(behavior) as behavior,
           |max(update_date) as update_date
           |from dwh.ods_dmp_user_info_all where dt = '${yesBef1Str}'
           |group by lower(dev_id)

      //  spark.sql(hql).createOrReplaceTempView("ods_dmp_user_info_all")
      val hql =
           |select coalesce(b.dev_id,a.dev_id) as dev_id,
           |coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5,
           |coalesce(b.dev_type,a.dev_type) as dev_type,
           |coalesce(b.platform,a.platform) as platform,
           |coalesce(b.install,a.install) as install,
           |coalesce(b.interest,a.interest) as interest,
           |coalesce(b.model,a.model) as model,
           |coalesce(b.country,a.country) as country,
           |coalesce(b.osversion,a.osversion) as osversion,
           |coalesce(b.age,a.age) as age,
           |coalesce(b.gender,a.gender) as gender,
           |coalesce(b.behavior,a.behavior) as behavior,
           |coalesce(b.update_date,a.update_date) as update_date,
           |coalesce(b.update_date,a.update_date) as update_date
           |  (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date
           |      from dwh.ods_dmp_user_info_all where dt = '${yesBef2DayStr}') a
           |  full outer join
           |  (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date
           |      from dwh.ods_dmp_user_info_daily where dt='${yesBef1Str}') b
           |  on a.dev_id = b.dev_id
      val hql =
           |select coalesce(b.dev_id,a.dev_id) as dev_id,
           |    coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5,
           |    coalesce(b.dev_type,a.dev_type) as dev_type,
           |    coalesce(b.platform,a.platform) as platform,
           |    coalesce(b.install,a.install) as install,
           |    coalesce(b.interest,a.interest) as interest,
           |    coalesce(b.model,a.model) as model,
           |    coalesce(b.country,a.country) as country,
           |    coalesce(b.osversion,a.osversion) as osversion,
           |    coalesce(b.age,a.age,10) as age,
           |    coalesce(b.gender,a.gender,10) as gender,
           |    coalesce(b.behavior,a.behavior) as behavior,
           |    coalesce(b.update_date,a.update_date) as update_date,
           |    coalesce(b.merge_bus,a.merge_bus) as merge_bus,
           |    coalesce(b.publish_date,a.publish_date) as publish_date
           |    (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date,merge_bus,
           |        '$yesBef1Part' as publish_date
           |        from dwh.ods_dmp_user_info_daily where dt='$yesBef1Str'
           |    ) b
           |    full outer join
           |    (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date,merge_bus,publish_date
           |        from dwh.ods_dmp_user_info_all where dt = '$yesBef2DayStr' and update_date >= '$befYearPart') a
           |    on a.dev_id = b.dev_id

      hql =
           |select coalesce(b.dev_id,a.dev_id) as dev_id,
           |    coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5,
           |    coalesce(b.dev_type,a.dev_type) as dev_type,
           |    coalesce(b.platform,a.platform) as platform,
           |    coalesce(b.install,a.install) as install,
           |    coalesce(b.interest,a.interest) as interest,
           |    coalesce(b.model,a.model) as model,
           |    coalesce(b.country,a.country) as country,
           |    coalesce(b.osversion,a.osversion) as osversion,
           |    coalesce(b.age,a.age,10) as age,
           |    coalesce(b.gender,a.gender,10) as gender,
           |    coalesce(b.behavior,a.behavior) as behavior,
           |    coalesce(b.update_date,a.update_date) as update_date,
           |    coalesce(b.update_date,a.publish_date) as publish_date
           |    (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date,publish_date
           |        from dwh.ods_dmp_user_info_all where dt = '${yesBef2DayStr}') a
           |    full outer join
           |    (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date
           |        from dwh.ods_dmp_user_info_daily where dt='${yesBef1Str}'
           |    ) b
           |    on a.dev_id = b.dev_id
        .option("orc.compress", "zlib")
    } finally {
      if (spark != null) {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("cur_day", true, "schedule Time")
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")


object OdsDmpUserInfoAll {
  def main(args: Array[String]): Unit = {
    new OdsDmpUserInfoAll().run(args)