package mobvista.dmp.common import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import com.google.gson.JsonElement import mobvista.dmp.util.MD5Util import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{DataFrame, SparkSession} import java.text.SimpleDateFormat import java.util.Properties import java.util.regex.Pattern import scala.collection.mutable /** * @package: mobvista.dmp.common * @author: wangjf * @date: 2020/3/6 * @time: 11:42 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ object MobvistaConstant { val tracking_3s_sql: String = """ |SELECT device_id, device_type, platform, MAX(country) country, toJsonBySplit(CONCAT_WS(';',COLLECT_SET(package_name)), platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_3s_install_daily WHERE `date` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val dsp_req_sql: String = """ |SELECT device_id, device_type, platform, MAX(country_code) country, toPkgJson(CONCAT_WS(',',COLLECT_SET(package_list)),platform,'@update_date') install_list, | parseDSPExtData(COLLECT_SET(region),device_type) ext_data | FROM dwh.etl_dsp_request_daily WHERE `date` = '@dt' | GROUP BY device_id, device_type, platform """.stripMargin val dsp_req_unmatch_sql: String = """ |SELECT device_id, device_type, platform, MAX(country_code) country, toPkgJson(CONCAT_WS(',',COLLECT_SET(package_list)),platform,'@update_date') install_list, | parseDSPExtData(COLLECT_SET(region),device_type) ext_data | FROM dwh.etl_dsp_request_unmatch WHERE `dt` = '@dt' AND datetime >= '@update_date 00:00:00' | GROUP BY device_id, device_type, platform """.stripMargin val adn_sdk_reuqest_sql: String = """ |SELECT device_id, device_type, platform, toJsonBySplit(CONCAT_WS(';',COLLECT_SET(package_name)), platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_adn_sdk_request_daily WHERE `date` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val adn_reuqest_sdk_sql: String = """ |SELECT device_id, device_type, platform, MAX(country) country, toJsonBySplit(CONCAT_WS(';',COLLECT_SET(package_name)), platform, '@update_date') install_list, | parseMExtData(CONCAT_WS(',',COLLECT_SET(strategy)), CONCAT_WS(',',COLLECT_SET(region)), MAX(dev_tag)) ext_data | FROM dwh.etl_adn_request_sdk_daily WHERE `dt` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val adn_reuqest_sdk_unmatch_sql: String = """ |SELECT device_id, device_type, platform, MAX(country) country, toJsonBySplit(CONCAT_WS(';',COLLECT_SET(package_name)), platform, update_date) install_list, | parseMExtData(CONCAT_WS(',',COLLECT_SET(strategy)), CONCAT_WS(',',COLLECT_SET(region)), MAX(dev_tag)) ext_data | FROM dwh.etl_adn_request_sdk_unmatch WHERE `dt` = '@dt' AND update_date >= '@update_date' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val adn_install_sql: String = """ |SELECT device_id, device_type, platform, '' country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_adn_install_daily WHERE `date` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val adn_request_other_sql: String = """ |SELECT device_id, device_type, platform, '' country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_adn_request_other_daily WHERE `date` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val adn_sdk_sql: String = """ |SELECT device_id, device_type, platform, country, toJsonString(install_list, platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_adn_sdk_daily WHERE `day` = '@dt' AND check_device(device_id) AND version = @version """.stripMargin val mp_sql: String = """ |SELECT device_id, 'gaid' device_type, 'android' platform, '' country, toPkgJson(CONCAT_WS(',',COLLECT_SET(package_list)), 'android', '@update_date') install_list, '{}' ext_data | FROM dwh.etl_mpsdk_request_daily WHERE `dt` = '@dt' AND check_device(device_id) | GROUP BY device_id """.stripMargin val ga_sql: String = """ |SELECT device_id, device_type, platform, '' country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data | FROM dwh.ods_ga_install_daily WHERE `date` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val clever_sql: String = """ |SELECT device_id, device_type, platform, '' country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_adn_clever_daily WHERE `date` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val bytedance_sql: String = """ |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_bytedance_daily WHERE `dt` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val joypacios_sql: String = """ |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data | FROM dwh.joypac_sdk_daily WHERE `dt` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val facebook_sql: String = """ |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_facebook_daily WHERE `dt` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val ali_sql: String = """ |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_baichuan_daily WHERE `dt` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val iqiyi_sql: String = """ |SELECT device_id, device_type, platform, MAX(country) country, toPkgJson(CONCAT_WS(',', COLLECT_SET(package_name)),platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_iqiyi_daily WHERE `dt` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform """.stripMargin val allpb_sql: String = """ |SELECT device_id, device_type, platform, MAX(country) country, toJsonBySplit(CONCAT_WS('#', COLLECT_SET(package_name)), platform, '@update_date') install_list, '{}' ext_data | FROM dwh.etl_3s_postback_daily WHERE `dt` = '@dt' AND check_device(device_id) | GROUP BY device_id, device_type, platform |""".stripMargin val install_list_sql: String = """ |SELECT device_id, device_type, platform, country, install_list | FROM dwh.dm_install_list WHERE CONCAT(year,month,day) = '@dt' AND business = '@business' AND check_device(device_id) |""".stripMargin val dmp_install_list_sql: String = """ |SELECT device_id, device_type, platform, country, get_filter_pkg(install_list,'@expireDate') install_list, ext_data, update_date | FROM dwh.dmp_install_list WHERE dt = '@dt' AND business = '@business' AND check_device(device_id) AND filter_pkg(install_list,'@expireDate') |""".stripMargin import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ def get_filter_pkg(install_list: String, expireDate: String): String = { JSON.toJSON(JSON.parse(install_list).asInstanceOf[java.util.Map[String, String]].asScala.retain((k, v) => v.compareTo(expireDate) >= 0 && StringUtils.isNotBlank(k)).toMap.asJava).toString } def filter_pkg(install_list: String, expireDate: String): Boolean = { JSON.parse(install_list).asInstanceOf[java.util.Map[String, String]].asScala.retain((k, v) => v.compareTo(expireDate) >= 0 && StringUtils.isNotBlank(k)).nonEmpty } def parseMExtData(strategys: String, regions: String, dev_tag: Int): String = { val ext_data = new JSONObject() val strategySet = new mutable.HashSet[String]() strategys.split(",").foreach(s => { // strategySet.add(s.split(";")(0)) strategySet.add(s) }) ext_data.put("strategy", strategySet.asJava) val regionSet = new mutable.HashSet[String]() regions.split(",").foreach(s => { regionSet.add(s) }) ext_data.put("region", regionSet.asJava) ext_data.put("dev_tag", dev_tag) ext_data.toJSONString } def parseDSPExtData(regionArray: mutable.WrappedArray[String], device_type: String): String = { val ext_data = new JSONObject() val regionSet = new mutable.HashSet[String]() regionArray.foreach(regions => { regions.replace("[", "").replace("]", "").replace("\"", "").split(",").foreach(s => { regionSet.add(s) }) }) ext_data.put("region", regionSet.asJava) if (device_type.equals("gaid")) { ext_data.put("dev_tag", 1) } else { ext_data.put("dev_tag", 0) } ext_data.toJSONString } def toJsonString(install_list: String, platform: String, update_date: String): String = { val jsonArray = GsonUtil.String2JsonArray(install_list) val json = new JSONObject() jsonArray.foreach(element => { val package_name = getJsonValue(element, "package_name") val install_date = getJsonValue(element, "date") if (StringUtils.isNotBlank(package_name)) { json.put(package_name, install_date) } }) if (json.isEmpty) { json.put(getPkgName(platform), update_date) } json.toJSONString } def toPkgJson(package_list: String, platform: String, update_date: String): String = { val json = new JSONObject() package_list.replace("[", "").replace("]", "").replace("\"", "").replace(";", ",") .split(",").foreach(pkg => { var package_name = pkg if (platform.equals("ios") && pkg.matches("^id[0-9]+$")) { package_name = package_name.replace("id", "") } if (checkPkgName(platform, package_name)) { json.put(package_name, update_date) } }) if (json.isEmpty) { json.put(getPkgName(platform), update_date) } json.toJSONString } def toJsonBySplit(package_list: String, platform: String, update_date: String): String = { val json = new JSONObject() package_list.split(";").foreach(pkg => { var package_name = pkg if (pkg.matches("^id[0-9]+$")) { package_name = package_name.replace("id", "") } if (checkPkgName(platform, package_name) && (!json.containsKey(package_name) || (json.containsKey(package_name) && json.getString(package_name).compareTo(update_date) < 0))) { json.put(package_name, update_date) } }) if (json.isEmpty) { json.put(getPkgName(platform), update_date) } json.toJSONString } def toJsonBySplitV2(package_list: String, platform: String, update_date: String): String = { val json = new JSONObject() package_list.split("#").foreach(pkg => { var package_name = pkg if (pkg.matches("^id[0-9]+$")) { package_name = package_name.replace("id", "") } if (checkPkgName(platform, package_name) && (!json.containsKey(package_name) || (json.containsKey(package_name) && json.getString(package_name).compareTo(update_date) < 0))) { json.put(package_name, update_date) } }) if (json.isEmpty) { json.put(getPkgName(platform), update_date) } json.toJSONString } def toJsonBySplitV1(package_list: String, platform: String, update_date: String): String = { val json = new JSONObject() package_list.split(";").foreach(pkg_date => { val pkgDate = pkg_date.split("#") var package_name = pkgDate(0) if (pkgDate(0).matches("^id[0-9]+$")) { package_name = package_name.replace("id", "") } if (checkPkgName(platform, package_name) && (!json.containsKey(package_name) || (json.containsKey(package_name) && json.getString(package_name).compareTo(pkgDate(1)) < 0))) { json.put(package_name, pkgDate(1)) } }) if (json.isEmpty) { json.put(getPkgName(platform), update_date) } json.toJSONString } def getJsonValue(element: JsonElement, key: String): String = { if (element != null && !element.isJsonNull && element.getAsJsonObject.has(key)) { element.getAsJsonObject.get(key).getAsString } else { "" } } def getPkgName(platform: String) = { if (platform.equals("ios")) { "0000000000" } else { "com.nonepkg.nonepkg" } } val countryPtn = "^([A-Z]{2,3})$" // IDFA/GAID val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$" // 全0 val allZero = "00000000-0000-0000-0000-000000000000" // IMEI val imeiPtn = "^([0-9]{15,17})$" // 14~16位连续多位相同字符,非法IMEI过滤 val imeiPtnAll = """^([0-9])\1{14,16}""" // androidId val andriodIdPtn = "^[a-zA-Z0-9]{16}$" // 连续多位相同字符,非法 androidId 过滤 val andriodIdAll = "^[a-zA-Z0-9]\1{15}$" // MD5 val md5Ptn = "^([a-fA-F0-9]{32})$" // 连续多位相同字符,非法 IMEI MD5 过滤 val umd5Ptn = """^([0-9A-Za-z])\1{29,31}""" // OAID val oaidPtb = """^[0-9A-Za-z-]{16,64}$""" def checkDeviceId(device_id: String): Boolean = { StringUtils.isNotBlank(device_id) && ((device_id.matches(didPtn) && !allZero.equals(device_id)) || (device_id.matches(imeiPtn) && !device_id.matches(imeiPtnAll)) || (device_id.matches(andriodIdPtn) && !device_id.matches(andriodIdAll)) || (device_id.matches(md5Ptn) && !device_id.matches(umd5Ptn)) || device_id.matches(oaidPtb) || device_id.length > 16) } val deviceTypeSet: mutable.Set[String] = mutable.Set("idfa", "gaid", "imei", "androidid", "sysid", "oaid", "idfv", "ruid", "idfamd5", "gaidmd5", "imeimd5", "oaidmd5", "upid") val iosPkgPtn = Pattern.compile("^\\d+$") val idIosPkgPtn = Pattern.compile("^id\\d+$") val adrPkgPtn = Pattern.compile("^(?=^.{3,255}$)[a-zA-Z0-9_][-a-zA-Z0-9_]{0,62}(\\.[a-zA-Z0-9_][-a-zA-Z0-9_]{0,62})+$") def checkPkgName(platform: String, pkg: String) = platform match { case "ios" => iosPkgPtn.matcher(pkg).matches case "android" | "adr" => adrPkgPtn.matcher(pkg).matches case _ => iosPkgPtn.matcher(pkg).matches || adrPkgPtn.matcher(pkg).matches } def String2JSONArray(str: String): JSONArray = { val jsonArray = if (StringUtils.isNotBlank(str)) { try { val element = JSON.parseArray(str) if (!element.isEmpty) { element } else { new JSONArray() } } catch { case _: Exception => new JSONArray() } } else { new JSONArray() } jsonArray } def String2JSONObject(str: String): JSONObject = { val jsonObject = if (StringUtils.isNotBlank(str)) { try { val element = JSON.parseObject(str) if (!element.isEmpty) { element } else { new JSONObject() } } catch { case _: Exception => new JSONObject() } } else { new JSONObject() } jsonObject } val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") def createSparkSession(appName: String): SparkSession = { SparkSession .builder() .appName(appName) .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() } def jdbcConnection(spark: SparkSession, database: String, table: String, url: String, user: String, password: String): DataFrame = { val properties = new Properties() properties.put("driver", "com.mysql.jdbc.Driver") properties.put("user", user) properties.put("password", password) properties.put("characterEncoding", "utf8") spark.read.jdbc(url = s"${url}/${database}", table = table, properties = properties) } def getMd5(device_id: String): String = { if (device_id.matches(MobvistaConstant.md5Ptn)) { device_id } else { MD5Util.getMD5Str(device_id) } } }