package mobvista.dmp.datasource.device;

import java.text.SimpleDateFormat

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{Row, SparkSession}

class OdsDmpUserInfoDaily_bak extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    //  2018-11-22 00:00:01
    val scheduleTime = commandLine.getOptionValue("cur_day")


    val sdf1 = new SimpleDateFormat("yyyy-MM-dd");
    val sdf2 = new SimpleDateFormat("yyyyMMdd");

    val spark = SparkSession
      .builder()
      .appName("OdsDmpUserInfoDaily_job")
      .config("spark.rdd.compress", "true")
      .config("spark.speculation", "false")
      .config("spark.speculation.quantile", "0.9")
      .config("spark.speculation.multiplier", "1")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._
    val sc = spark.sparkContext

    try {

      //昨天时间   2018-11-21
      val yesBef1Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -1)
      val yesBef1Str = sdf2.format(sdf1.parse(yesBef1Part))   // 20181121


      //2daysago时间
      //  2018-11-20
      val yesBef2Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -2)
      val yesBef2Str = sdf2.format(sdf1.parse(yesBef2Part))  // 20181120

      System.out.print("yesBef1Part:" + yesBef1Part +";yesBef1Str" + yesBef1Str +";yesBef2Part" + yesBef2Part +";yesBef2Str" + yesBef2Str)

      spark.sql("add jar s3://mob-emr-test/dataplatform/datawarehourse/md5.jar")
      var hql =
        s"""
           |drop table if exists dwh.ga_device_add_tmp
        """.stripMargin
      spark.sql(hql)

      //在hive建表读取ga数据
      /* hql =
         s"""
            |create table dwh.ga_device_add_tmp (
            |dev_id string,
            |model string,
            |country string,
            |osversion string
            |) row format delimited  fields terminated by '\t'
            |location 's3://mob-emr-test/dataplatform/datawarehourse/ga/${yesBef2Str}'
         """.stripMargin*/

      //在hive建表读取ga数据
      hql =
        s"""
           |create table dwh.ga_device_add_tmp (
           |dev_id string,
           |model string,
           |country string,
           |osversion string
           |) row format delimited  fields terminated by '\t'
           |location 's3://mob-emr-test/dataplatform/datawarehourse/real-time-query/ga_add/${yesBef2Str}'
        """.stripMargin

     // spark.sql(hql)
      System.out.print("hql1: "+ hql)


      hql =
        s"""
           |insert overwrite table kehan_test.devices_daily
           |select X.device_id
           |from (
           |select device_id
           |from dwh.dm_device_tag
           |where dt='${yesBef1Str}' and update_date='${yesBef1Part}' and business not in ('ga', 'other')
           |union all
           |select device_id
           |from dwh.dm_device_tag
           |where dt='${yesBef2Str}' and update_date='${yesBef2Part}' and  business in ('ga', 'other') ) X
           |group by X.device_id
        """.stripMargin
    //  spark.sql(hql)

      System.out.print("hql2: "+ hql)

      hql =
        s"""
           |insert overwrite table kehan_test.dm_pkg_insterest
           |select a.device_id as dev_id,
           |dwh.md5(a.device_id) as dev_id_md5,
           |a.device_type,
           |a.platform,
           |concat_ws(',',collect_set(a.package_name)) as install,
           |concat_ws('#',collect_set(concat_ws(',',a.tag_type,a.first_tag,a.second_tag,a.business))) as interest
           |from (
           |select device_id,device_type,platform,package_name,tag_type,first_tag,second_tag,
           |case when business='userdesk' then 'userdesk' else 'mv' end as business,
           |update_date
           |from dwh.dm_device_tag
           |where dt='${yesBef1Str}' and update_date='${yesBef1Part}' and business not in ('ga', 'other')
           |union all
           |select device_id,device_type,platform,package_name,tag_type,first_tag,second_tag,
           |case when business='userdesk' then 'userdesk' else 'mv' end as business,
           |update_date
           |from dwh.dm_device_tag
           |where dt='${yesBef2Str}' and update_date='${yesBef2Part}' and  business in ('ga', 'other')
           |) a
           |left semi join kehan_test.devices_daily b
           |on a.device_id=b.device_id
           |group by a.device_id,a.device_type,a.platform
        """.stripMargin
   //   spark.sql(hql)
      System.out.print("hql2: "+ hql)

      //增量字段 model country osversion
      hql =
        s"""
           |insert overwrite table kehan_test.dm_pkg_insterest_model_os_country
           |select a.dev_id as dev_id,
           |a.dev_id_md5 as dev_id_md5,
           |coalesce(a.device_type,b.device_type,c.id_type) as dev_type,
           |coalesce(a.platform,b.platform,c.platform) as platform,
           |a.install,
           |a.interest,
           |coalesce(b.model,c.device_model,d.device,e.model) as model,
           |coalesce(b.country,c.country_code,d.country,e.country) as country,
           |coalesce(b.os_v,c.os_version,d.os_version,e.osversion) as osversion
           |from  kehan_test.dm_pkg_insterest a left join
           | (select device_id,
           |max(device_type) as device_type,
           |max(platform) as platform,
           |max(model) as model,
           |max(country) as country,
           |max(os_v) as os_v
           |from dwh.dm_profile_total
           |where type='dsp' and concat(year,month,day) = '${yesBef1Str}'  and dmp_time='${yesBef1Part}' group by device_id) b
           |on a.dev_id=b.device_id left join
           | (select device_id,
           |max(id_type) as id_type,
           |max(platform) as platform,
           |max(device_model) as device_model,
           |max(country_code) as country_code,
           |max(os_version) as os_version
           |from dwh.ods_adn_device_total
           |where  concat(year,month,day) = '${yesBef1Str}'  and to_date(update_time)='${yesBef1Part}' group by device_id) c
           |on a.dev_id=c.device_id left join
           | (select devid,
           | max(device) as device,
           | max(country) as country,
           | max(os_version) as os_version
           |from dwh.ods_3s_trackingnew_install sss where concat(sss.yyyy,sss.mm,sss.dd)='${yesBef1Str}' group by devid) d
           |on a.dev_id=d.devid left join dwh.ga_device_add_tmp e
           |on a.dev_id=e.dev_id
        """.stripMargin

    // spark.sql(hql)

      System.out.print("hql3: "+ hql)


      /*   var sql =
           s"""
              |  select t.device_id, t.age, t.tag
              |  from dwh.dm_device_age t
              |  where  update_date='${yesBef1Part}' and  concat(t.year, t.month, t.day)='${yesBef1Str}'
           """.stripMargin*/
      var sql =
        s"""
           |select a.device_id,b.age,b.tag
           |from kehan_test.devices_daily a
           |join ( select t.device_id, t.age, t.tag
           |  from dwh.dm_device_age t
           |  where   concat(t.year, t.month, t.day)='${yesBef1Str}'
           |  ) b on a.device_id=b.device_id
        """.stripMargin
      spark.sql(sql)
        .rdd
        .map(parseAge(_))
        .toDF("device_id", "age", "ratio")
        .createOrReplaceTempView("tmp_age")
   //   System.out.print("hql4: "+ hql)

      sql =
        """
          |insert overwrite table kehan_test.tmp_age_daily
          |select t.device_id, t.age
          |from (
          |  select t.device_id, t.age, row_number() over(partition by t.device_id, t.age order by t.ratio desc )as rk
          |  from tmp_age t
          |  where t.age is not null and t.ratio is not null
          |) t
          |where t.rk='1'
        """.stripMargin
    //  spark.sql(sql)

      System.out.print("hql5: "+ hql)

      /*   sql =
           s"""
              |select t.device_id,t.gender from
              |(select device_id,
              |      case when gender = 'm' then 1
              |      when gender = 'f' then 2
              |      else 10 end as gender, row_number() over(partition by device_id order by  ratio desc) as rk
              |    from dwh.dm_device_gender
              |    where  update_date='${yesBef1Part}' and concat(year, month, day)='${yesBef1Str}' ) t
              |where t.rk ='1';
           """.stripMargin */
      sql =
        s"""
           |insert overwrite table kehan_test.tmp_gender_daily
           |select t.device_id,t.gender from
           |(select device_id,
           |      case when gender = 'm' then 1
           |      when gender = 'f' then 2
           |      else 10 end as gender, row_number() over(partition by device_id order by  ratio desc) as rk
           |    from dwh.dm_device_gender
           |    where  concat(year, month, day)='${yesBef1Str}' ) t
           |where t.rk ='1'
          """.stripMargin
    //  spark.sql(sql)

      System.out.print("hql6: "+ hql)

      // 性别、年龄、兴趣join得到最终结果
      // age和gender 每个设备取ratio最大的一条
      sql =
        s"""
        insert overwrite table dwh.ods_dmp_user_info_daily partition(dt='${yesBef1Str}')
           |select a.dev_id,
           |	   a.dev_id_md5,
           |	   a.dev_type,
           |	   a.platform,
           |	   a.install,
           |	   a.interest,
           |	   a.model,
           |	   a.country,
           |	   a.osversion,
           |	   b.age,
           |	   c.gender,
           |     d.behavior,
           |     ${yesBef1Part}
           |from kehan_test.dm_pkg_insterest_model_os_country a
           |left outer join kehan_test.tmp_age_daily b on a.dev_id = b.device_id
           |left outer join kehan_test.tmp_gender_daily c on a.dev_id = c.device_id
           |left outer join (select device_id,concat_ws(',',collect_set(tag_name)) as behavior
           |from ( select device_id,tag_name
           |from  dwh.dmp_event_tag_daily
           |where day='${yesBef1Str}' and tag_source='3s'
           |union all
           |select device_id,tag_name
           |from  dwh.dmp_event_tag_daily
           |where day='${yesBef2Str}' and tag_source='ga' ) X
           |group by X.device_id
           |) d on a.dev_id = d.device_id
         """.stripMargin
      spark.sql(sql)
      /*    .write
          .option("orc.compress", "zlib")
          .orc(output)*/

      System.out.print("hql7: "+ hql)

      sql =
        s"""
           |select a.dev_id,
           |	   a.dev_id_md5,
           |	   a.dev_type,
           |	   a.platform,
           |	   a.install,
           |	   a.interest,
           |	   a.model,
           |	   a.country,
           |	   a.osversion,
           |	   b.age,
           |	   c.gender,
           |     d.behavior,
           |     ${yesBef1Part}
           |from kehan_test.dm_pkg_insterest_model_os_country a
           |left outer join kehan_test.tmp_age_daily b on a.dev_id = b.device_id
           |left outer join kehan_test.tmp_gender_daily c on a.dev_id = c.device_id
           |left outer join (select device_id,concat_ws(',',collect_set(tag_name)) as behavior
           |from ( select device_id,tag_name
           |from  dwh.dmp_event_tag_daily
           |where day='${yesBef1Str}' and tag_source='3s'
           |union all
           |select device_id,tag_name
           |from  dwh.dmp_event_tag_daily
           |where day='${yesBef2Str}' and tag_source='ga' ) X
           |group by X.device_id
           |) d on a.dev_id = d.device_id
         """.stripMargin
      spark.sql(sql)
        .write
          .option("orc.compress", "zlib")
          .orc("s3://mob-emr-test/andy/dwh/userinfo")

      System.out.print("hql8: "+ hql)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }




  def commandOptions(): Options ={
    val options = new Options()
    options.addOption("cur_day", true, "schedule Time")
    options
  }



  def parseAge(row: Row): Tuple3[String, Int, Double] = {
    val deviceId = row.getString(0)
    val age = row.getString(1)
    val tag = row.getString(2)

    val json = GsonUtil.String2JsonObject(age)
    val ageJson = json.get("age_and_proportion").getAsJsonObject
    val ageSource = json.get("age_and_source").getAsJsonObject

    if (ageJson.toString.contains("0-17")) {
      var max = 0.0d
      var maxRange = ""
      if (ageJson.get("0-17").getAsDouble > max) {
        max = ageJson.get("0-17").getAsDouble
        maxRange = "0-17"
      }
      if (ageJson.get("18-24").getAsDouble > max) {
        max = ageJson.get("18-24").getAsDouble
        maxRange = "18-24"
      }
      if (ageJson.get("25-44").getAsDouble > max) {
        max = ageJson.get("25-44").getAsDouble
        maxRange = "25-44"
      }
      if (ageJson.get("45-59").getAsDouble > max) {
        max = ageJson.get("45-59").getAsDouble
        maxRange = "45-59"
      }
      if (ageJson.get("60+").getAsDouble > max) {
        max = ageJson.get("60+").getAsDouble
        maxRange = "60+"
      }

      val tmpAge = maxRange match {
        case "0-17" => 1
        case "18-24" => 2
        case "25-44" => 3
        case "45-59" => 4
        case "60+" => 5
        case  _ => 10
      }
      (deviceId, tmpAge, max)
    } else if (!"{\"null\":\"null\"}".equals(ageSource.toString)) {
      var ageRange = ageSource.toString.replaceAll("[\\{\\}\"]", "").split(":", -1)(0)
      val tmpAge = ageRange match {
        case "0-17" => 1
        case "18-24" => 2
        case "25-44" => 3
        case "45-59" => 4
        case "60+" => 5
        case  _ => 10
      }
      (deviceId, tmpAge, 1.0)
    } else {
      (deviceId, 10, 0)
    }
  }
}

object OdsDmpUserInfoDaily_bak {
  def main(args: Array[String]): Unit = {
    new OdsDmpUserInfoDaily_bak().run(args)
  }
}