package mobvista.dmp.datasource.device

import java.net.URI
import java.text.SimpleDateFormat

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

class OdsDmpUserInfoAllV2 extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val scheduleTime = commandLine.getOptionValue("cur_day")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
    val sdf2 = new SimpleDateFormat("yyyyMMdd")

    val spark = SparkSession
      .builder()
      .appName("OdsDmpUserInfoAllV2")
      .config("spark.rdd.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    try {
      val sc = spark.sparkContext
      //  昨天时间
      val yesBef1Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -1)
      val yesBef1Str = sdf2.format(sdf1.parse(yesBef1Part))

      //  2daysago时间
      val yesBef2Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -2)
      val yesBef2DayStr = sdf2.format(sdf1.parse(yesBef2Part))

      //  14 days age
      val yesBef14Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -14)

      /*
      def schema: StructType = {
        StructType(StructField("device_id", StringType) ::
          Nil)
      }

      val publishDF = spark.createDataFrame(sc.textFile(input).map(r => {
        Row(r.toLowerCase())
      }), schema)
      publishDF.dropDuplicates.createOrReplaceTempView("dev_publish")
      */

      val hql =
        s"""
           |select coalesce(b.dev_id,a.dev_id) as dev_id,
           |    coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5,
           |    coalesce(b.dev_type,a.dev_type) as dev_type,
           |    coalesce(b.platform,a.platform) as platform,
           |    coalesce(b.install,a.install) as install,
           |    coalesce(b.interest,a.interest) as interest,
           |    coalesce(b.model,a.model) as model,
           |    coalesce(b.country,a.country) as country,
           |    coalesce(b.osversion,a.osversion) as osversion,
           |    coalesce(b.age,a.age,'10') as age,
           |    coalesce(b.gender,a.gender,10) as gender,
           |    coalesce(b.behavior,a.behavior) as behavior,
           |    coalesce(b.update_date,a.update_date) as update_date,
           |    case when b.publish_date is not null and b.publish_date != '' then b.publish_date
           |      when a.publish_date is not null and a.publish_date != '' then a.publish_date
           |      else '${yesBef14Part}' end as publish_date
           |from
           |    (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion, age, gender,behavior,update_date,publish_date
           |        from dwh.ods_dmp_user_info_all_v2 where dt = '${yesBef2DayStr}' and update_date > '${yesBef14Part}') a
           |    full outer join
           |    (
           |        select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion, age, gender,behavior,update_date,
           |            '${yesBef1Part}' as publish_date
           |            from dwh.ods_dmp_user_info_daily where dt='${yesBef1Str}'
           |    ) b
           |    on a.dev_id = b.dev_id
        """.stripMargin

      /*
      hql =
        s"""
           |select lower(dev_id) dev_id,
           |max(dev_id_md5) as dev_id_md5,
           |max(dev_type) dev_type,
           |max(platform) platform,
           |max(install) as install,
           |max(interest) as interest,
           |max(model) as model,
           |max(country) as country,
           |max(osversion) as osversion,
           |min(age) as age,
           |min(gender) as gender,
           |max(behavior) as behavior,
           |max(update_date) as update_date,
           |max(publish_date) as publish_date
           |from dwh.ods_dmp_user_info_all where dt = '${yesBef1Str}' and update_date > '${yesBef14Part}'
           |group by lower(dev_id)
        """.stripMargin
      */

      /*
      val hql =
        s"""
           |select coalesce(b.dev_id,a.dev_id) as dev_id,
           |coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5,
           |coalesce(b.dev_type,a.dev_type) as dev_type,
           |coalesce(b.platform,a.platform) as platform,
           |coalesce(b.install,a.install) as install,
           |coalesce(b.interest,a.interest) as interest,
           |coalesce(b.model,a.model) as model,
           |coalesce(b.country,a.country) as country,
           |coalesce(b.osversion,a.osversion) as osversion,
           |coalesce(b.age,a.age) as age,
           |coalesce(b.gender,a.gender) as gender,
           |coalesce(b.behavior,a.behavior) as behavior,
           |coalesce(b.update_date,a.update_date) as update_date,
           |coalesce(${yesBef1Part},a.publish_date) as publish_date
           |from
           |  (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date
           |      from dwh.ods_dmp_user_info_all where dt = '${yesBef2DayStr}') a
           |  full outer join
           |  (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date
           |      from dwh.ods_dmp_user_info_daily where dt='${yesBef1Str}') b
           |  on a.dev_id = b.dev_id
           |  left semi join dev_publish c
           |  on a.dev_id = c.device_id
        """.stripMargin
      */

      spark.sql(hql)
        .dropDuplicates("dev_id", "dev_id_md5", "dev_type", "platform")
        .repartition(coalesce.toInt)
        .write
        .option("orc.compress", "zlib")
        .orc(output)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("cur_day", true, "schedule Time")
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

}

object OdsDmpUserInfoAllV2 {
  def main(args: Array[String]): Unit = {
    new OdsDmpUserInfoAllV2().run(args)
  }
}