package mobvista.dmp.datasource.datatory

import java.text.SimpleDateFormat
import java.util
import java.util.Properties

import mobvista.dmp.datasource.dm.Constant.{allZero, andriodIdPtn, didPtn, imeiPtn}
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

/**
  * @package: mobvista.dmp.datasource.datatory
  * @author: wangjf
  * @date: 2019/4/1
  * @time: 下午4:00
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
object Constant {

  val filter_sql: String =
    """
      |SELECT UPPER(dev_id) device_id, install, interest, country, age, gender
      | FROM dwh.ods_dmp_user_info_all WHERE dt = '@date' AND update_date >= '@update_date'
    """.stripMargin

  val ods_dmp_user_info_daily_sql: String =
    """
      |SELECT dev_id, platform, interest, country, age, gender
      | FROM dev.ods_dmp_user_info_daily WHERE dt = "@date"
    """.stripMargin

  val ods2new_sql: String =
    """
      |SELECT UPPER(CONCAT(tag_type, '-', first_tag, '-', second_tag)) tag_id, new_first_id, new_second_id FROM
      | dwh.dm_old2new_tag
    """.stripMargin

  val tracking_install_sql: String =
    """
      |SELECT b.device_id, device_model, os_version, UPPER(country) country, city, CAST(b.offer_id AS string) offer_id, CAST(COALESCE(a.id,'') AS string) id, COALESCE(a.event_name,'') event_name, a.event_type FROM
      | (SELECT devid device_id, MAX(device) device_model, MAX(os_version) os_version, MAX(country) country, MAX(city) city, uuid offer_id FROM dwh.ods_3s_trackingnew_install
      |   WHERE yyyy = '@year' and mm = '@month' and dd = '@day' GROUP BY devid, uuid) b
      | LEFT JOIN
      | (SELECT id, event_name, event_type, offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date') a
      | ON a.offer_id = b.offer_id
    """.stripMargin

  //  create_src = 0
  val tracking_event_sql: String =
    """
      |SELECT b.device_id, UPPER(country) country, CAST(b.offer_id AS string) offer_id, COALESCE(a.id, b.event_name) id, COALESCE(a.event_name, b.event_name) event_name, COALESCE(a.event_type,'') event_type FROM
      | (SELECT devid device_id, MAX(country) country, event_name, uuid offer_id FROM dwh.ods_3s_trackingcsv_event_info
      |   WHERE yyyymmdd = '@date' AND devid IS NOT NULL AND devid <> '' GROUP BY devid, event_name, uuid) b
      | LEFT JOIN
      | (SELECT CAST(id AS string) id, event_name, event_type, offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date') a
      | ON a.offer_id = b.offer_id
    """.stripMargin

  val event_info_sql: String =
    """
      |SELECT event_name id, event_name, '' event_type, CAST(uuid AS string) offer_id FROM dwh.ods_3s_trackingcsv_event_info WHERE yyyymmdd = '@date'
      | UNION ALL
      | SELECT event_name id, event_name, '' event_type, campaign_id offer_id FROM dwh.ods_adn_tracking_ss_event WHERE CONCAT(yyyy,mm,dd) = '@date'
    """.stripMargin

  val tracking_event_info_sql: String =
    """
      |SELECT CAST(id AS string) id, event_name, event_type, CAST(offer_id AS string) offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date'
    """.stripMargin

  val all_event_info_sql: String =
    """
      |SELECT id, event_name, event_type, offer_id FROM dev.event_info WHERE dt = '@before_date'
    """.stripMargin

  val adv_install_sql: String =
    """
      |SELECT UPPER(device_id) device_id FROM
      | dwh.etl_tracking_install_daily WHERE dt BETWEEN '@start' AND '@end'
    """.stripMargin

  val adv_event_sql: String =
    """
      |SELECT UPPER(device_id) device_id FROM
      | dwh.etl_3s_event_daily WHERE dt BETWEEN '@start' AND '@end'
    """.stripMargin

  val adv_tag_sql: String =
    """
      |SELECT /*+ MAPJOIN(b) */a.device_id device_id, install, interest, country, age, gender
      | FROM adv b
      | JOIN user_info a
      | ON a.device_id = b.device_id
    """.stripMargin

  val adn_tracking_install_sql: String =
    """
      |SELECT
      |   CASE WHEN lower(platform) = 'ios' AND idfa IS NOT NULL AND idfa != '' THEN UPPER(idfa)
      |   WHEN lower(platform) = 'android' AND gaid IS NOT NULL AND gaid != '' THEN UPPER(gaid)
      |   WHEN lower(platform) = 'android' AND imei IS NOT NULL AND imei != '' THEN UPPER(imei) ELSE '' END AS device_id,
      |   device_model, os_version, UPPER(country_code) country, getCity(ios_ab) city, campaign_id, app_id
      |   FROM dwh.ods_adn_trackingnew_install WHERE CONCAT(yyyy,mm,dd) = '@date'
    """.stripMargin

  //  create_src > 0 (1,2,3)
  val adn_tracking_event_sql: String =
    """
      |SELECT
      |   CASE WHEN lower(platform) = 'ios' AND idfa IS NOT NULL AND idfa != '' THEN UPPER(idfa)
      |   WHEN lower(platform) = 'android' AND gaid IS NOT NULL AND gaid != '' THEN UPPER(gaid)
      |   WHEN lower(platform) = 'android' AND imei IS NOT NULL AND imei != '' THEN UPPER(imei)
      |   WHEN lower(platform) = 'android' AND android_id IS NOT NULL AND android_id != '' THEN UPPER(android_id) ELSE '' END AS device_id,
      |   UPPER(country) country, event_name, campaign_id
      |   FROM dwh.ods_adn_tracking_ss_event WHERE CONCAT(yyyy,mm,dd) = '@date'
    """.stripMargin

  val adn_tracking_install_join_event_sql: String =
    """
      |SELECT
      |   a.device_id device_id,CASE WHEN b.device_model IS NOT NULL THEN b.device_model ELSE '' END AS device_model,
      |   CASE WHEN b.os_version IS NOT NULL THEN b.os_version ELSE '' END AS os_version,a.country country,
      |   CASE WHEN b.city IS NOT NULL THEN b.city ELSE '' END AS city,a.event_name event_name ,a.event_type event_type,
      |   a.campaign_id campaign_id, CASE WHEN b.app_id IS NOT NULL THEN b.app_id ELSE array() END AS app_id
      |FROM
      |   (
      |       SELECT device_id, MAX(country) country, COLLECT_SET(event_name) event_name,COLLECT_SET(getEventType(event_name)) event_type,COLLECT_SET(campaign_id) campaign_id
      |       FROM dwh.etl_tracking_adn_event_daily WHERE dt = '@date'
      |       GROUP BY device_id
      |   ) a
      |   LEFT OUTER JOIN
      |   (
      |       SELECT device_id, MAX(device_model) device_model, MAX(os_version) os_version, MAX(city) city, COLLECT_SET(app_id) app_id
      |       FROM dwh.etl_tracking_adn_install_daily WHERE dt = '@date' GROUP BY device_id
      |   ) b
      |   ON a.device_id = b.device_id
    """.stripMargin

  val adn_tracking_merge_sql: String =
    """
      |SELECT
      |   device_id, MAX(device_model) device_model,MAX(os_version) os_version,MAX(country) country,MAX(city) city,COLLECT_SET(campaign_id) campaign_id, array() event_name, array() event_type,COLLECT_SET(app_id) app_id, 0 log_type
      |   FROM dwh.etl_tracking_adn_install_daily WHERE dt = '@date'
      |   GROUP BY device_id
      |   UNION ALL
      |SELECT
      |   device_id,device_model,os_version,country,city,campaign_id,event_name, event_type,app_id, 1 log_type
      |   FROM event_join
    """.stripMargin

  val event_sql: String =
    """
      |SELECT
      |   event_name,event_type
      |FROM dev.event_info
      |   WHERE dt = '@date' AND event_name != '' AND event_type != ''
      |GROUP BY event_name,event_type
    """.stripMargin


  val tracking_install_join_event_sql: String =
    """
      |SELECT
      |   a.device_id device_id,a.offer_id,a.id,a.event_name,a.event_type,a.country,b.city,b.device_model,b.os_version
      |FROM
      |   (
      |       SELECT UPPER(device_id) device_id, COLLECT_SET(offer_id) offer_id, COLLECT_SET(id) id,COLLECT_SET(event_name) event_name, COLLECT_SET(event_type) event_type, MAX(country) country
      |       FROM dwh.etl_3s_event_daily WHERE dt = '@date' GROUP BY UPPER(device_id)
      |   ) a
      |   LEFT OUTER JOIN
      |   (
      |       SELECT UPPER(device_id) device_id, COLLECT_SET(offer_id) offer_id, COLLECT_SET(id) id,COLLECT_SET(event_name) event_name, COLLECT_SET(event_type) event_type, MAX(country) country, MAX(city) city,MAX(device_model) device_model, MAX(os_version) os_version
      |       FROM dwh.etl_tracking_install_daily WHERE dt = '@date' GROUP BY UPPER(device_id)
      |   ) b
      |   ON a.device_id = b.device_id
    """.stripMargin

  val tracking_merge_sql: String =
    """
      |SELECT
      |   UPPER(device_id) device_id, COLLECT_SET(offer_id) offer_id, COLLECT_SET(id) id,COLLECT_SET(event_name) event_name, COLLECT_SET(event_type) event_type,
      |   MAX(country) country, MAX(city) city,MAX(device_model) device_model, MAX(os_version) os_version, 0 log_type
      |   FROM dwh.etl_tracking_install_daily WHERE dt = '@date' GROUP BY UPPER(device_id)
      |   UNION ALL
      |SELECT
      |   device_id,offer_id,id,event_name,event_type,country,CASE WHEN city IS NOT NULL THEN city ELSE '' END AS city,
      |   CASE WHEN device_model IS NOT NULL THEN device_model ELSE '' END AS device_model, CASE WHEN os_version IS NOT NULL THEN os_version ELSE '' END AS os_version, 1 log_type
      |   FROM event_join
    """.stripMargin

  //  因上游数据不干净,为了去重
  def processDistinctInstall(device_id: String, packages: String): util.ArrayList[DevTagEntity] = {
    val res = new util.ArrayList[DevTagEntity]()
    packages.split(",").foreach(pkgDate => {
      val rs = pkgDate.split("\\|")
      if (StringUtils.isNotBlank(rs(0))) {
        res.add(DevTagEntity(device_id, rs(0)))
      }
    })
    res
  }

  //  因上游数据不干净,为了去重
  def processDistinctInterest(device_id: String, interests: String, bMap: Broadcast[scala.collection.Map[String, (String, String)]]): util.ArrayList[DevTagEntity] = {
    val res = new util.ArrayList[DevTagEntity]()
    val set = new util.HashSet[String]()
    interests.split("#").foreach(interest => {
      val rs = interest.split(",")
      val tag_id = rs(0) + "-" + rs(1) + "-" + rs(2)
      if (bMap.value.contains(tag_id.toUpperCase)) {
        res.add(DevTagEntity(device_id, bMap.value(tag_id.toUpperCase)._2)) //  二级标签
        res.add(DevTagEntity(device_id, bMap.value(tag_id.toUpperCase)._1)) //  一级标签
      }
    })
    res
  }

  def processInstall(packages: String): util.ArrayList[(String, Int)] = {
    val res = new util.ArrayList[(String, Int)]()
    val set = new util.HashSet[String]()
    packages.split(",").foreach(pkgDate => {
      val rs = pkgDate.split("\\|")
      if (StringUtils.isNotBlank(rs(0))) {
        set.add(rs(0))
      }
    })
    set.iterator().foreach(s => {
      res.add((s.toLowerCase, 1))
    })
    res
  }

  def processInterest(interests: String, bMap: Broadcast[scala.collection.Map[String, (String, String)]]): util.ArrayList[(String, Int)] = {
    val res = new util.ArrayList[(String, Int)]()
    val set = new util.HashSet[String]()
    interests.split("#").foreach(interest => {
      val rs = interest.split(",")
      val tag_id = rs(0) + "-" + rs(1) + "-" + rs(2)
      if (bMap.value.contains(tag_id.toUpperCase)) {
        set.add(bMap.value(tag_id.toUpperCase)._2) //  二级标签
        set.add(bMap.value(tag_id.toUpperCase)._1) //  一级标签
      }
    })
    set.iterator().foreach(s => {
      res.add((s, 1))
    })
    res
  }

  def distinctPartition(iter: Iterator[UserInfoEntity], bMap: Broadcast[scala.collection.Map[String, (String, String)]], category: String): Iterator[DevTagEntity] = {
    val res = new util.ArrayList[DevTagEntity]()
    while (iter.hasNext) {
      val ir = iter.next
      category match {
        case "package" =>
          //  res.addAll(processInstall(ir.install))
          if (StringUtils.isNotBlank(ir.install)) {
            res.addAll(processDistinctInstall(ir.dev_id, ir.install))
          }
        case "interest" =>
          //  res.addAll(processInterest(ir.interest, bMap)) //  标签 ID 化
          if (StringUtils.isNotEmpty(ir.interest)) {
            res.addAll(processDistinctInterest(ir.dev_id, ir.interest, bMap))
          }
        case "country" =>
          if (StringUtils.isNotBlank(ir.country)) {
            res.add(DevTagEntity(ir.dev_id, ir.country.toUpperCase))
          }
        case "age" =>
          if (StringUtils.isNotBlank(ir.age)) {
            res.add(DevTagEntity(ir.dev_id, ir.age))
          }
        case "gender" =>
          if (StringUtils.isNotBlank(ir.gender)) {
            res.add(DevTagEntity(ir.dev_id, ir.gender))
          }
        case _ =>
          null
      }
    }
    res.asScala.iterator
  }

  def commonPartition(iter: Iterator[UserInfoEntity], bMap: Broadcast[scala.collection.Map[String, (String, String)]], category: String): Iterator[(String, Int)] = {
    val res = new util.ArrayList[(String, Int)]()
    while (iter.hasNext) {
      val ir = iter.next
      category match {
        case "package" =>
          res.addAll(processInstall(ir.install))
        case "interest" =>
          res.addAll(processInterest(ir.interest, bMap)) //  标签 ID 化
        case "country" =>
          if (StringUtils.isNotBlank(ir.country)) {
            res.add((ir.country.toUpperCase, 1))
          }
        case "age" =>
          if (StringUtils.isNotBlank(ir.age)) {
            res.add((ir.age, 1))
          }
        case "gender" =>
          if (StringUtils.isNotBlank(ir.gender)) {
            res.add((ir.gender, 1))
          }
        case _ =>
          null
      }
    }
    res.asScala.iterator
  }

  def commonPartition(iter: Iterator[DevTagEntity]): Iterator[(String, Int)] = {
    val res = new util.ArrayList[(String, Int)]()
    while (iter.hasNext) {
      val ir = iter.next
      /*
      category match {
        case "package" =>
          //  res.addAll(processInstall(ir.tag))
          res.add((ir.tag, 1))
        case "interest" =>
          //  res.addAll(processInterest(ir.tag, bMap)) //  标签 ID 化
          res.add((ir.tag, 1))
        case "country" =>
          res.add((ir.tag.toUpperCase, 1))
        case "age" =>
          res.add((ir.tag, 1))
        case "gender" =>
          res.add((ir.tag, 1))
        case _ =>
          null
      }
      */
      res.add((ir.tag, 1))
    }
    res.asScala.iterator
  }

  def filterByPackage(packages: String, pkgs: String): Boolean = {
    val set = pkgs.split(",").toSet
    var flag = false
    val itr = packages.split(",").iterator
    while (itr.hasNext && !flag) {
      val rs = itr.next.split("\\|")
      if (set.contains(rs(0))) {
        flag = true
      }
    }
    flag
  }

  def filterByCountry(country: String, countryCode: String): Boolean = {
    val set = countryCode.split(",").toSet
    var flag = false
    if (set.contains(country)) {
      flag = true
    }
    flag
  }

  def filterCommon(column: String, list: String): Boolean = {
    val set = list.toLowerCase.split(",").toSet
    var flag = false
    if (set.contains(column.toLowerCase)) {
      flag = true
    }
    flag
  }

  def parseJsonStringPackage(jsonString: String): PackageFilterEntity = {
    GsonUtil.fromJson(GsonUtil.String2JsonObject(jsonString), classOf[PackageFilterEntity])
  }

  def parseJsonStringAdv(jsonString: String): AdvFilterEntity = {
    GsonUtil.fromJson(GsonUtil.String2JsonObject(jsonString), classOf[AdvFilterEntity])
  }

  val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
  val sdf2 = new SimpleDateFormat("yyyyMMdd")

  def processQuery(date: String, tag: String, jsonString: String, spark: SparkSession): (RDD[UserInfoEntity], Long, String, Integer) = {
    var jobId: String = ""
    var top: Integer = 20
    val baseDF =
      if (tag.equals("pkg")) {
        spark.udf.register("filterByPackage", Constant.filterByPackage _)
        spark.udf.register("filterByCountry", Constant.filterByCountry _)

        val packageFilterEntity: PackageFilterEntity = Constant.parseJsonStringPackage(jsonString)
        jobId = packageFilterEntity.jobId

        var end = ""
        var updateDate = ""
        if (date.compareTo(packageFilterEntity.end) < 0) {
          end = date
          val days = (sdf2.parse(packageFilterEntity.end).getTime - sdf2.parse(packageFilterEntity.start).getTime) / 1000 / 3600 / 24
          val update_date = DateUtil.getDayByString(end, "yyyyMMdd", -days.toInt)
          updateDate = sdf1.format(sdf2.parse(update_date))
        } else {
          end = packageFilterEntity.end
          updateDate = sdf1.format(sdf2.parse(packageFilterEntity.start))
        }

        var sql = Constant.filter_sql.replace("@date", end).replace("@update_date", updateDate)

        if (StringUtils.isNotBlank(packageFilterEntity.countryCode)) {
          sql = sql + s" AND filterByCountry(country,'${packageFilterEntity.countryCode}')"
        }

        if (StringUtils.isNotBlank(packageFilterEntity.packageList)) {
          sql = sql + s" AND filterByPackage(install,'${packageFilterEntity.packageList}')"
        }

        if (packageFilterEntity.top != null) {
          top = packageFilterEntity.top
        }

        val df = spark.sql(sql)
          .dropDuplicates(Seq("device_id"))
          .rdd
          .map(r => {
            UserInfoEntity(r.getAs("device_id"), r.getAs("install"), r.getAs("interest"), r.getAs("country"), r.getAs("age"), r.getAs("gender"))
          }).persist(StorageLevel.MEMORY_AND_DISK_SER)
        (df, df.count)
      } else {
        spark.udf.register("filterCommon", Constant.filterCommon _)
        val advFilterEntity: AdvFilterEntity = Constant.parseJsonStringAdv(jsonString)
        jobId = advFilterEntity.jobId

        val start = advFilterEntity.start

        /**
          * 不填写event相关信息、拉回event信息但未做任何选择的情况下,查install日志,获取安装结果。填写event的情况下,查install日志+event日志,获取安装结果(以uuid/offer_id来join两份日志)
          *
          */
        var install_sql = adv_install_sql.replace("@start", start).replace("@end", advFilterEntity.end)
        var event_sql = adv_event_sql.replace("@start", start).replace("@end", advFilterEntity.end)
        if (StringUtils.isNotBlank(advFilterEntity.offerId)) {
          install_sql = install_sql + s" AND filterCommon(offer_id,'${advFilterEntity.offerId}')"
          event_sql = event_sql + s" AND filterCommon(offer_id,'${advFilterEntity.offerId}')"
        }
        if (StringUtils.isNotBlank(advFilterEntity.eventName)) {
          install_sql = install_sql + s" AND filterCommon(id,'${advFilterEntity.eventName}')"
          event_sql = event_sql + s" AND filterCommon(id,'${advFilterEntity.eventName}')"
        }
        if (StringUtils.isNotBlank(advFilterEntity.eventType)) {
          install_sql = install_sql + s" AND filterCommon(event_type,'${advFilterEntity.eventType}')"
          event_sql = event_sql + s" AND filterCommon(event_type,'${advFilterEntity.eventType}')"
        }

        val adv_df = if (StringUtils.isNotBlank(advFilterEntity.eventName) || StringUtils.isNotBlank(advFilterEntity.eventType)) {
          spark.sql(install_sql).union(spark.sql(event_sql)).dropDuplicates
        } else {
          spark.sql(install_sql).dropDuplicates
        }
        adv_df.createOrReplaceTempView("adv")

        var end = ""
        var updateDate = ""

        if (advFilterEntity.end.compareTo(date) <= 0 && advFilterEntity.end.compareTo(DateUtil.getDayByString(date, "yyyyMMdd", -3)) >= 0) {
          end = advFilterEntity.end
          updateDate = sdf1.format(sdf2.parse(advFilterEntity.start))
        } else {
          end = date
          val days = (sdf2.parse(advFilterEntity.end).getTime - sdf2.parse(advFilterEntity.start).getTime) / 1000 / 3600 / 24
          val update_date = DateUtil.getDayByString(end, "yyyyMMdd", -days.toInt)
          updateDate = sdf1.format(sdf2.parse(update_date))
        }

        val sql = Constant.filter_sql.replace("@date", end).replace("@update_date", updateDate)
        spark.sql(sql).createOrReplaceTempView("user_info")
        val df = spark.sql(adv_tag_sql)
          .dropDuplicates(Seq("device_id"))
          .rdd
          .map(r => {
            UserInfoEntity(r.getAs("device_id"), r.getAs("install"), r.getAs("interest"), r.getAs("country"), r.getAs("age"), r.getAs("gender"))
          }).persist(StorageLevel.MEMORY_AND_DISK_SER)
        (df, adv_df.count())
      }
    (baseDF._1, baseDF._2, jobId, top)
  }

  def processBase(baseDF: RDD[UserInfoEntity], count: Long, jobId: String, top: Integer, spark: SparkSession, bMap: Broadcast[scala.collection.Map[String, (String, String)]]): Dataset[Row] = {
    val coalesce = 1
    val sc = spark.sparkContext

    //  val all = baseDF.count()
    val allDF = sc.parallelize(Seq(Result(jobId, "all", "all", count.toInt)))
    import spark.implicits._
    //  .mapPartitions(distinctPartition(_, bMap, "package"))
    val packageDF = baseDF
      .mapPartitions(commonPartition(_, bMap, "package"))
      .reduceByKey(_ + _)
      .repartition(coalesce.toInt)
      .sortBy(_._2, false)
      .map(r => {
        Result(jobId, "package", r._1, r._2)
      })

    val packageRDD = sc.parallelize(packageDF.take(top))

    //  .mapPartitions(distinctPartition(_, bMap, "interest"))
    val interestDF = baseDF
      .mapPartitions(commonPartition(_, bMap, "interest"))
      .reduceByKey(_ + _)
      .repartition(coalesce.toInt)
      .sortByKey()
      .map(r => {
        Result(jobId, "interest", r._1, r._2)
      })

    //  .mapPartitions(distinctPartition(_, bMap, "country"))
    val countryDF = baseDF
      .mapPartitions(commonPartition(_, bMap, "country"))
      .reduceByKey(_ + _)
      .repartition(coalesce.toInt)
      .sortBy(_._2, false)
      .map(r => {
        Result(jobId, "country", r._1, r._2)
      })

    val countryRDD = sc.parallelize(countryDF.take(top))

    //  .mapPartitions(distinctPartition(_, bMap, "age"))
    val ageDF = baseDF
      .mapPartitions(commonPartition(_, bMap, "age"))
      .reduceByKey(_ + _)
      .repartition(coalesce.toInt)
      .map(r => {
        Result(jobId, "age", r._1, r._2)
      })

    //  .mapPartitions(distinctPartition(_, bMap, "gender"))
    val genderDF = baseDF
      .mapPartitions(commonPartition(_, bMap, "gender"))
      .reduceByKey(_ + _)
      .repartition(coalesce.toInt)
      .map(r => {
        Result(jobId, "gender", r._1, r._2)
      })

    val df = allDF.union(packageRDD).union(interestDF).union(countryRDD).union(ageDF).union(genderDF)
      .toDF
      .repartition(1)
    df
  }

  def getGenderId(id: String): String = {
    val genderId =
      id match {
        case "1" => "00001001"
        case "2" => "00001002"
        case _ => "00001999"
      }
    genderId
  }

  def getAgeId(id: String): String = {
    val ageId =
      id match {
        case "1" => "00002001"
        case "2" => "00002002"
        case "3" => "00002003"
        case "4" => "00002004"
        case "5" => "00002005"
        case _ => "00002999"
      }
    ageId
  }

  def writeMySQL(df: Dataset[Row], table: String, saveMode: SaveMode): Unit = {
    val prop = new java.util.Properties
    prop.setProperty("driver", "com.mysql.jdbc.Driver")
    prop.setProperty("user", "apptag_rw")
    prop.setProperty("password", "7gyLEVtkER3u8c9")
    prop.setProperty("characterEncoding", "utf8")

    df.write.mode(saveMode).jdbc("jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/datatory", table, prop)
  }

  def jdbcConnection(spark: SparkSession, database: String, table: String): DataFrame = {
    val url = PropertyUtil.getProperty("config.properties", "mysql.mob_adn.url")
    val username = PropertyUtil.getProperty("config.properties", "mysql.mob_adn.user")
    val password = PropertyUtil.getProperty("config.properties", "mysql.mob_adn.password")

    val properties = new Properties()
    properties.put("driver", "com.mysql.jdbc.Driver")
    properties.put("user", username)
    properties.put("password", password)
    properties.put("characterEncoding", "utf8")

    println(s"url ==>> jdbc:mysql://adn-mysql-internal.mobvista.com:3306/${database},user ==>> ${username},password ==>> ${password}")

    spark.read.jdbc(url = s"jdbc:mysql://${url}:3306/${database}", table = table, properties = properties)
  }

  def check_deviceId(device_id: String): Boolean = {
    StringUtils.isNotBlank(device_id) && (device_id.matches(didPtn) && !device_id.equals(allZero) || device_id.matches(imeiPtn) || device_id.matches(andriodIdPtn))
  }
}