package mobvista.dmp.common

import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.google.gson.JsonElement
import mobvista.dmp.util.MD5Util
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.text.SimpleDateFormat
import java.util.Properties
import java.util.regex.Pattern
import scala.collection.mutable

/**
 * @package: mobvista.dmp.common
 * @author: wangjf
 * @date: 2020/3/6
 * @time: 11:42 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
object MobvistaConstant {

  val tracking_3s_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country) country, toJsonBySplit(CONCAT_WS(';',COLLECT_SET(package_name)), platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_3s_install_daily WHERE `date` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val dsp_req_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country_code) country, toPkgJson(CONCAT_WS(',',COLLECT_SET(package_list)),platform,'@update_date') install_list,
      | parseDSPExtData(COLLECT_SET(region),device_type) ext_data
      | FROM dwh.etl_dsp_request_daily WHERE `date` = '@dt'
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val dsp_req_unmatch_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country_code) country, toPkgJson(CONCAT_WS(',',COLLECT_SET(package_list)),platform,'@update_date') install_list,
      | parseDSPExtData(COLLECT_SET(region),device_type) ext_data
      | FROM dwh.etl_dsp_request_unmatch WHERE `dt` = '@dt' AND datetime >= '@update_date 00:00:00'
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val adn_sdk_reuqest_sql: String =
    """
      |SELECT device_id, device_type, platform, toJsonBySplit(CONCAT_WS(';',COLLECT_SET(package_name)), platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_adn_sdk_request_daily WHERE `date` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val adn_reuqest_sdk_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country) country, toJsonBySplit(CONCAT_WS(';',COLLECT_SET(package_name)), platform, '@update_date') install_list,
      | parseMExtData(CONCAT_WS(',',COLLECT_SET(strategy)), CONCAT_WS(',',COLLECT_SET(region)), MAX(dev_tag)) ext_data
      | FROM dwh.etl_adn_request_sdk_daily WHERE `dt` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val adn_reuqest_sdk_unmatch_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country) country, toJsonBySplit(CONCAT_WS(';',COLLECT_SET(package_name)), platform, update_date) install_list,
      | parseMExtData(CONCAT_WS(',',COLLECT_SET(strategy)), CONCAT_WS(',',COLLECT_SET(region)), MAX(dev_tag)) ext_data
      | FROM dwh.etl_adn_request_sdk_unmatch WHERE `dt` = '@dt' AND update_date >= '@update_date' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val adn_install_sql: String =
    """
      |SELECT device_id, device_type, platform, '' country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_adn_install_daily WHERE `date` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val adn_request_other_sql: String =
    """
      |SELECT device_id, device_type, platform, '' country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_adn_request_other_daily WHERE `date` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val adn_sdk_sql: String =
    """
      |SELECT device_id, device_type, platform, country, toJsonString(install_list, platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_adn_sdk_daily WHERE `day` = '@dt' AND check_device(device_id) AND version = @version
    """.stripMargin

  val mp_sql: String =
    """
      |SELECT device_id, 'gaid' device_type, 'android' platform, '' country, toPkgJson(CONCAT_WS(',',COLLECT_SET(package_list)), 'android', '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_mpsdk_request_daily WHERE `dt` = '@dt' AND check_device(device_id)
      | GROUP BY device_id
    """.stripMargin

  val ga_sql: String =
    """
      |SELECT device_id, device_type, platform, '' country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.ods_ga_install_daily WHERE `date` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val clever_sql: String =
    """
      |SELECT device_id, device_type, platform, '' country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_adn_clever_daily WHERE `date` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val bytedance_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_bytedance_daily WHERE `dt` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val joypacios_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.joypac_sdk_daily WHERE `dt` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val facebook_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_facebook_daily WHERE `dt` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val ali_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_baichuan_daily WHERE `dt` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val iqiyi_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_iqiyi_daily WHERE `dt` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
    """.stripMargin

  val allpb_sql: String =
    """
      |SELECT device_id, device_type, platform, MAX(country) country, toJsonBySplit(CONCAT_WS('#', COLLECT_SET(package_name)), platform, '@update_date') install_list, '{}' ext_data
      | FROM dwh.etl_3s_postback_daily WHERE `dt` = '@dt' AND check_device(device_id)
      | GROUP BY device_id, device_type, platform
      |""".stripMargin

  val install_list_sql: String =
    """
      |SELECT device_id, device_type, platform, country, install_list
      | FROM dwh.dm_install_list WHERE CONCAT(year,month,day) = '@dt' AND business = '@business'  AND check_device(device_id)
      |""".stripMargin

  val dmp_install_list_sql: String =
    """
      |SELECT device_id, device_type, platform, country, get_filter_pkg(install_list,'@expireDate') install_list, ext_data, update_date
      | FROM dwh.dmp_install_list WHERE dt = '@dt' AND business = '@business'  AND check_device(device_id) AND filter_pkg(install_list,'@expireDate')
      |""".stripMargin

  import scala.collection.JavaConversions._
  import scala.collection.JavaConverters._

  def get_filter_pkg(install_list: String, expireDate: String): String = {
    JSON.toJSON(JSON.parse(install_list).asInstanceOf[java.util.Map[String, String]].asScala.retain((k, v) => v.compareTo(expireDate) >= 0 && StringUtils.isNotBlank(k)).toMap.asJava).toString
  }

  def filter_pkg(install_list: String, expireDate: String): Boolean = {
    JSON.parse(install_list).asInstanceOf[java.util.Map[String, String]].asScala.retain((k, v) => v.compareTo(expireDate) >= 0 && StringUtils.isNotBlank(k)).nonEmpty
  }

  def parseMExtData(strategys: String, regions: String, dev_tag: Int): String = {
    val ext_data = new JSONObject()
    val strategySet = new mutable.HashSet[String]()
    strategys.split(",").foreach(s => {
      //  strategySet.add(s.split(";")(0))
      strategySet.add(s)
    })
    ext_data.put("strategy", strategySet.asJava)
    val regionSet = new mutable.HashSet[String]()
    regions.split(",").foreach(s => {
      regionSet.add(s)
    })
    ext_data.put("region", regionSet.asJava)
    ext_data.put("dev_tag", dev_tag)
    ext_data.toJSONString
  }

  def parseDSPExtData(regionArray: mutable.WrappedArray[String], device_type: String): String = {
    val ext_data = new JSONObject()
    val regionSet = new mutable.HashSet[String]()
    regionArray.foreach(regions => {
      regions.replace("[", "").replace("]", "").replace("\"", "").split(",").foreach(s => {
        regionSet.add(s)
      })
    })
    ext_data.put("region", regionSet.asJava)
    if (device_type.equals("gaid")) {
      ext_data.put("dev_tag", 1)
    } else {
      ext_data.put("dev_tag", 0)
    }
    ext_data.toJSONString
  }

  def toJsonString(install_list: String, platform: String, update_date: String): String = {
    val jsonArray = GsonUtil.String2JsonArray(install_list)
    val json = new JSONObject()
    jsonArray.foreach(element => {
      val package_name = getJsonValue(element, "package_name")
      val install_date = getJsonValue(element, "date")
      if (StringUtils.isNotBlank(package_name)) {
        json.put(package_name, install_date)
      }
    })
    if (json.isEmpty) {
      json.put(getPkgName(platform), update_date)
    }
    json.toJSONString
  }

  def toPkgJson(package_list: String, platform: String, update_date: String): String = {
    val json = new JSONObject()
    package_list.replace("[", "").replace("]", "").replace("\"", "").replace(";", ",")
      .split(",").foreach(pkg => {
      var package_name = pkg
      if (platform.equals("ios") && pkg.matches("^id[0-9]+$")) {
        package_name = package_name.replace("id", "")
      }
      if (checkPkgName(platform, package_name)) {
        json.put(package_name, update_date)
      }
    })
    if (json.isEmpty) {
      json.put(getPkgName(platform), update_date)
    }
    json.toJSONString
  }

  def toJsonBySplit(package_list: String, platform: String, update_date: String): String = {
    val json = new JSONObject()
    package_list.split(";").foreach(pkg => {
      var package_name = pkg
      if (pkg.matches("^id[0-9]+$")) {
        package_name = package_name.replace("id", "")
      }
      if (checkPkgName(platform, package_name) && (!json.containsKey(package_name) || (json.containsKey(package_name) &&
        json.getString(package_name).compareTo(update_date) < 0))) {
        json.put(package_name, update_date)
      }
    })
    if (json.isEmpty) {
      json.put(getPkgName(platform), update_date)
    }
    json.toJSONString
  }

  def toJsonBySplitV2(package_list: String, platform: String, update_date: String): String = {
    val json = new JSONObject()
    package_list.split("#").foreach(pkg => {
      var package_name = pkg
      if (pkg.matches("^id[0-9]+$")) {
        package_name = package_name.replace("id", "")
      }
      if (checkPkgName(platform, package_name) && (!json.containsKey(package_name) || (json.containsKey(package_name) &&
        json.getString(package_name).compareTo(update_date) < 0))) {
        json.put(package_name, update_date)
      }
    })
    if (json.isEmpty) {
      json.put(getPkgName(platform), update_date)
    }
    json.toJSONString
  }

  def toJsonBySplitV1(package_list: String, platform: String, update_date: String): String = {
    val json = new JSONObject()
    package_list.split(";").foreach(pkg_date => {
      val pkgDate = pkg_date.split("#")
      var package_name = pkgDate(0)
      if (pkgDate(0).matches("^id[0-9]+$")) {
        package_name = package_name.replace("id", "")
      }
      if (checkPkgName(platform, package_name) && (!json.containsKey(package_name) || (json.containsKey(package_name) && json.getString(package_name).compareTo(pkgDate(1)) < 0))) {
        json.put(package_name, pkgDate(1))
      }
    })
    if (json.isEmpty) {
      json.put(getPkgName(platform), update_date)
    }
    json.toJSONString
  }

  def getJsonValue(element: JsonElement, key: String): String = {
    if (element != null && !element.isJsonNull && element.getAsJsonObject.has(key)) {
      element.getAsJsonObject.get(key).getAsString
    } else {
      ""
    }
  }

  def getPkgName(platform: String) = {
    if (platform.equals("ios")) {
      "0000000000"
    } else {
      "com.nonepkg.nonepkg"
    }
  }

  val countryPtn = "^([A-Z]{2,3})$"

  //  IDFA/GAID
  val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"
  //  全0
  val allZero = "00000000-0000-0000-0000-000000000000"
  //  IMEI
  val imeiPtn = "^([0-9]{15,17})$"
  //  14~16位连续多位相同字符,非法IMEI过滤
  val imeiPtnAll = """^([0-9])\1{14,16}"""
  //  androidId
  val andriodIdPtn = "^[a-zA-Z0-9]{16}$"
  //  连续多位相同字符,非法 androidId 过滤
  val andriodIdAll = "^[a-zA-Z0-9]\1{15}$"
  //  MD5
  val md5Ptn = "^([a-fA-F0-9]{32})$"
  //  连续多位相同字符,非法 IMEI MD5 过滤
  val umd5Ptn = """^([0-9A-Za-z])\1{29,31}"""
  //  OAID
  val oaidPtb = """^[0-9A-Za-z-]{16,64}$"""

  def checkDeviceId(device_id: String): Boolean = {
    StringUtils.isNotBlank(device_id) &&
      ((device_id.matches(didPtn) && !allZero.equals(device_id)) ||
        (device_id.matches(imeiPtn) && !device_id.matches(imeiPtnAll)) ||
        (device_id.matches(andriodIdPtn) && !device_id.matches(andriodIdAll)) ||
        (device_id.matches(md5Ptn) && !device_id.matches(umd5Ptn)) ||
        device_id.matches(oaidPtb) || device_id.length > 16)
  }

  val deviceTypeSet: mutable.Set[String] = mutable.Set("idfa", "gaid", "imei", "androidid", "sysid", "oaid",
    "idfv", "ruid", "idfamd5", "gaidmd5", "imeimd5", "oaidmd5", "upid")

  val iosPkgPtn = Pattern.compile("^\\d+$")
  val idIosPkgPtn = Pattern.compile("^id\\d+$")
  val adrPkgPtn = Pattern.compile("^(?=^.{3,255}$)[a-zA-Z0-9_][-a-zA-Z0-9_]{0,62}(\\.[a-zA-Z0-9_][-a-zA-Z0-9_]{0,62})+$")

  def checkPkgName(platform: String, pkg: String) = platform match {
    case "ios" =>
      iosPkgPtn.matcher(pkg).matches
    case "android" | "adr" =>
      adrPkgPtn.matcher(pkg).matches
    case _ =>
      iosPkgPtn.matcher(pkg).matches || adrPkgPtn.matcher(pkg).matches
  }

  def String2JSONArray(str: String): JSONArray = {
    val jsonArray = if (StringUtils.isNotBlank(str)) {
      try {
        val element = JSON.parseArray(str)
        if (!element.isEmpty) {
          element
        } else {
          new JSONArray()
        }
      } catch {
        case _: Exception =>
          new JSONArray()
      }
    } else {
      new JSONArray()
    }
    jsonArray
  }

  def String2JSONObject(str: String): JSONObject = {
    val jsonObject = if (StringUtils.isNotBlank(str)) {
      try {
        val element = JSON.parseObject(str)
        if (!element.isEmpty) {
          element
        } else {
          new JSONObject()
        }
      } catch {
        case _: Exception =>
          new JSONObject()
      }
    } else {
      new JSONObject()
    }
    jsonObject
  }

  val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
  val sdf2 = new SimpleDateFormat("yyyyMMdd")

  def createSparkSession(appName: String): SparkSession = {
    SparkSession
      .builder()
      .appName(appName)
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
  }

  def jdbcConnection(spark: SparkSession, database: String, table: String, url: String, user: String, password: String): DataFrame = {
    val properties = new Properties()
    properties.put("driver", "com.mysql.jdbc.Driver")
    properties.put("user", user)
    properties.put("password", password)
    properties.put("characterEncoding", "utf8")

    spark.read.jdbc(url = s"${url}/${database}", table = table, properties = properties)
  }

  def getMd5(device_id: String): String = {
    if (device_id.matches(MobvistaConstant.md5Ptn)) {
      device_id
    } else {
      MD5Util.getMD5Str(device_id)
    }
  }
}