package mobvista.dmp.datasource.datatory import java.text.SimpleDateFormat import java.util import java.util.Properties import mobvista.dmp.datasource.dm.Constant.{allZero, andriodIdPtn, didPtn, imeiPtn} import mobvista.dmp.util.{DateUtil, PropertyUtil} import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang3.StringUtils import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ /** * @package: mobvista.dmp.datasource.datatory * @author: wangjf * @date: 2019/4/1 * @time: 下午4:00 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ object Constant { val filter_sql: String = """ |SELECT UPPER(dev_id) device_id, install, interest, country, age, gender | FROM dwh.ods_dmp_user_info_all WHERE dt = '@date' AND update_date >= '@update_date' """.stripMargin val ods_dmp_user_info_daily_sql: String = """ |SELECT dev_id, platform, interest, country, age, gender | FROM dev.ods_dmp_user_info_daily WHERE dt = "@date" """.stripMargin val ods2new_sql: String = """ |SELECT UPPER(CONCAT(tag_type, '-', first_tag, '-', second_tag)) tag_id, new_first_id, new_second_id FROM | dwh.dm_old2new_tag """.stripMargin val tracking_install_sql: String = """ |SELECT b.device_id, device_model, os_version, UPPER(country) country, city, CAST(b.offer_id AS string) offer_id, CAST(COALESCE(a.id,'') AS string) id, COALESCE(a.event_name,'') event_name, a.event_type FROM | (SELECT devid device_id, MAX(device) device_model, MAX(os_version) os_version, MAX(country) country, MAX(city) city, uuid offer_id FROM dwh.ods_3s_trackingnew_install | WHERE yyyy = '@year' and mm = '@month' and dd = '@day' GROUP BY devid, uuid) b | LEFT JOIN | (SELECT id, event_name, event_type, offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date') a | ON a.offer_id = b.offer_id """.stripMargin // create_src = 0 val tracking_event_sql: String = """ |SELECT b.device_id, UPPER(country) country, CAST(b.offer_id AS string) offer_id, COALESCE(a.id, b.event_name) id, COALESCE(a.event_name, b.event_name) event_name, COALESCE(a.event_type,'') event_type FROM | (SELECT devid device_id, MAX(country) country, event_name, uuid offer_id FROM dwh.ods_3s_trackingcsv_event_info | WHERE yyyymmdd = '@date' AND devid IS NOT NULL AND devid <> '' GROUP BY devid, event_name, uuid) b | LEFT JOIN | (SELECT CAST(id AS string) id, event_name, event_type, offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date') a | ON a.offer_id = b.offer_id """.stripMargin val event_info_sql: String = """ |SELECT event_name id, event_name, '' event_type, CAST(uuid AS string) offer_id FROM dwh.ods_3s_trackingcsv_event_info WHERE yyyymmdd = '@date' | UNION ALL | SELECT event_name id, event_name, '' event_type, campaign_id offer_id FROM dwh.ods_adn_tracking_ss_event WHERE CONCAT(yyyy,mm,dd) = '@date' """.stripMargin val tracking_event_info_sql: String = """ |SELECT CAST(id AS string) id, event_name, event_type, CAST(offer_id AS string) offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date' """.stripMargin val all_event_info_sql: String = """ |SELECT id, event_name, event_type, offer_id FROM dev.event_info WHERE dt = '@before_date' """.stripMargin val adv_install_sql: String = """ |SELECT UPPER(device_id) device_id FROM | dwh.etl_tracking_install_daily WHERE dt BETWEEN '@start' AND '@end' """.stripMargin val adv_event_sql: String = """ |SELECT UPPER(device_id) device_id FROM | dwh.etl_3s_event_daily WHERE dt BETWEEN '@start' AND '@end' """.stripMargin val adv_tag_sql: String = """ |SELECT /*+ MAPJOIN(b) */a.device_id device_id, install, interest, country, age, gender | FROM adv b | JOIN user_info a | ON a.device_id = b.device_id """.stripMargin val adn_tracking_install_sql: String = """ |SELECT | CASE WHEN lower(platform) = 'ios' AND idfa IS NOT NULL AND idfa != '' THEN UPPER(idfa) | WHEN lower(platform) = 'android' AND gaid IS NOT NULL AND gaid != '' THEN UPPER(gaid) | WHEN lower(platform) = 'android' AND imei IS NOT NULL AND imei != '' THEN UPPER(imei) ELSE '' END AS device_id, | device_model, os_version, UPPER(country_code) country, getCity(ios_ab) city, campaign_id, app_id | FROM dwh.ods_adn_trackingnew_install WHERE CONCAT(yyyy,mm,dd) = '@date' """.stripMargin // create_src > 0 (1,2,3) val adn_tracking_event_sql: String = """ |SELECT | CASE WHEN lower(platform) = 'ios' AND idfa IS NOT NULL AND idfa != '' THEN UPPER(idfa) | WHEN lower(platform) = 'android' AND gaid IS NOT NULL AND gaid != '' THEN UPPER(gaid) | WHEN lower(platform) = 'android' AND imei IS NOT NULL AND imei != '' THEN UPPER(imei) | WHEN lower(platform) = 'android' AND android_id IS NOT NULL AND android_id != '' THEN UPPER(android_id) ELSE '' END AS device_id, | UPPER(country) country, event_name, campaign_id | FROM dwh.ods_adn_tracking_ss_event WHERE CONCAT(yyyy,mm,dd) = '@date' """.stripMargin val adn_tracking_install_join_event_sql: String = """ |SELECT | a.device_id device_id,CASE WHEN b.device_model IS NOT NULL THEN b.device_model ELSE '' END AS device_model, | CASE WHEN b.os_version IS NOT NULL THEN b.os_version ELSE '' END AS os_version,a.country country, | CASE WHEN b.city IS NOT NULL THEN b.city ELSE '' END AS city,a.event_name event_name ,a.event_type event_type, | a.campaign_id campaign_id, CASE WHEN b.app_id IS NOT NULL THEN b.app_id ELSE array() END AS app_id |FROM | ( | SELECT device_id, MAX(country) country, COLLECT_SET(event_name) event_name,COLLECT_SET(getEventType(event_name)) event_type,COLLECT_SET(campaign_id) campaign_id | FROM dwh.etl_tracking_adn_event_daily WHERE dt = '@date' | GROUP BY device_id | ) a | LEFT OUTER JOIN | ( | SELECT device_id, MAX(device_model) device_model, MAX(os_version) os_version, MAX(city) city, COLLECT_SET(app_id) app_id | FROM dwh.etl_tracking_adn_install_daily WHERE dt = '@date' GROUP BY device_id | ) b | ON a.device_id = b.device_id """.stripMargin val adn_tracking_merge_sql: String = """ |SELECT | device_id, MAX(device_model) device_model,MAX(os_version) os_version,MAX(country) country,MAX(city) city,COLLECT_SET(campaign_id) campaign_id, array() event_name, array() event_type,COLLECT_SET(app_id) app_id, 0 log_type | FROM dwh.etl_tracking_adn_install_daily WHERE dt = '@date' | GROUP BY device_id | UNION ALL |SELECT | device_id,device_model,os_version,country,city,campaign_id,event_name, event_type,app_id, 1 log_type | FROM event_join """.stripMargin val event_sql: String = """ |SELECT | event_name,event_type |FROM dev.event_info | WHERE dt = '@date' AND event_name != '' AND event_type != '' |GROUP BY event_name,event_type """.stripMargin val tracking_install_join_event_sql: String = """ |SELECT | a.device_id device_id,a.offer_id,a.id,a.event_name,a.event_type,a.country,b.city,b.device_model,b.os_version |FROM | ( | SELECT UPPER(device_id) device_id, COLLECT_SET(offer_id) offer_id, COLLECT_SET(id) id,COLLECT_SET(event_name) event_name, COLLECT_SET(event_type) event_type, MAX(country) country | FROM dwh.etl_3s_event_daily WHERE dt = '@date' GROUP BY UPPER(device_id) | ) a | LEFT OUTER JOIN | ( | SELECT UPPER(device_id) device_id, COLLECT_SET(offer_id) offer_id, COLLECT_SET(id) id,COLLECT_SET(event_name) event_name, COLLECT_SET(event_type) event_type, MAX(country) country, MAX(city) city,MAX(device_model) device_model, MAX(os_version) os_version | FROM dwh.etl_tracking_install_daily WHERE dt = '@date' GROUP BY UPPER(device_id) | ) b | ON a.device_id = b.device_id """.stripMargin val tracking_merge_sql: String = """ |SELECT | UPPER(device_id) device_id, COLLECT_SET(offer_id) offer_id, COLLECT_SET(id) id,COLLECT_SET(event_name) event_name, COLLECT_SET(event_type) event_type, | MAX(country) country, MAX(city) city,MAX(device_model) device_model, MAX(os_version) os_version, 0 log_type | FROM dwh.etl_tracking_install_daily WHERE dt = '@date' GROUP BY UPPER(device_id) | UNION ALL |SELECT | device_id,offer_id,id,event_name,event_type,country,CASE WHEN city IS NOT NULL THEN city ELSE '' END AS city, | CASE WHEN device_model IS NOT NULL THEN device_model ELSE '' END AS device_model, CASE WHEN os_version IS NOT NULL THEN os_version ELSE '' END AS os_version, 1 log_type | FROM event_join """.stripMargin // 因上游数据不干净,为了去重 def processDistinctInstall(device_id: String, packages: String): util.ArrayList[DevTagEntity] = { val res = new util.ArrayList[DevTagEntity]() packages.split(",").foreach(pkgDate => { val rs = pkgDate.split("\\|") if (StringUtils.isNotBlank(rs(0))) { res.add(DevTagEntity(device_id, rs(0))) } }) res } // 因上游数据不干净,为了去重 def processDistinctInterest(device_id: String, interests: String, bMap: Broadcast[scala.collection.Map[String, (String, String)]]): util.ArrayList[DevTagEntity] = { val res = new util.ArrayList[DevTagEntity]() val set = new util.HashSet[String]() interests.split("#").foreach(interest => { val rs = interest.split(",") val tag_id = rs(0) + "-" + rs(1) + "-" + rs(2) if (bMap.value.contains(tag_id.toUpperCase)) { res.add(DevTagEntity(device_id, bMap.value(tag_id.toUpperCase)._2)) // 二级标签 res.add(DevTagEntity(device_id, bMap.value(tag_id.toUpperCase)._1)) // 一级标签 } }) res } def processInstall(packages: String): util.ArrayList[(String, Int)] = { val res = new util.ArrayList[(String, Int)]() val set = new util.HashSet[String]() packages.split(",").foreach(pkgDate => { val rs = pkgDate.split("\\|") if (StringUtils.isNotBlank(rs(0))) { set.add(rs(0)) } }) set.iterator().foreach(s => { res.add((s.toLowerCase, 1)) }) res } def processInterest(interests: String, bMap: Broadcast[scala.collection.Map[String, (String, String)]]): util.ArrayList[(String, Int)] = { val res = new util.ArrayList[(String, Int)]() val set = new util.HashSet[String]() interests.split("#").foreach(interest => { val rs = interest.split(",") val tag_id = rs(0) + "-" + rs(1) + "-" + rs(2) if (bMap.value.contains(tag_id.toUpperCase)) { set.add(bMap.value(tag_id.toUpperCase)._2) // 二级标签 set.add(bMap.value(tag_id.toUpperCase)._1) // 一级标签 } }) set.iterator().foreach(s => { res.add((s, 1)) }) res } def distinctPartition(iter: Iterator[UserInfoEntity], bMap: Broadcast[scala.collection.Map[String, (String, String)]], category: String): Iterator[DevTagEntity] = { val res = new util.ArrayList[DevTagEntity]() while (iter.hasNext) { val ir = iter.next category match { case "package" => // res.addAll(processInstall(ir.install)) if (StringUtils.isNotBlank(ir.install)) { res.addAll(processDistinctInstall(ir.dev_id, ir.install)) } case "interest" => // res.addAll(processInterest(ir.interest, bMap)) // 标签 ID 化 if (StringUtils.isNotEmpty(ir.interest)) { res.addAll(processDistinctInterest(ir.dev_id, ir.interest, bMap)) } case "country" => if (StringUtils.isNotBlank(ir.country)) { res.add(DevTagEntity(ir.dev_id, ir.country.toUpperCase)) } case "age" => if (StringUtils.isNotBlank(ir.age)) { res.add(DevTagEntity(ir.dev_id, ir.age)) } case "gender" => if (StringUtils.isNotBlank(ir.gender)) { res.add(DevTagEntity(ir.dev_id, ir.gender)) } case _ => null } } res.asScala.iterator } def commonPartition(iter: Iterator[UserInfoEntity], bMap: Broadcast[scala.collection.Map[String, (String, String)]], category: String): Iterator[(String, Int)] = { val res = new util.ArrayList[(String, Int)]() while (iter.hasNext) { val ir = iter.next category match { case "package" => res.addAll(processInstall(ir.install)) case "interest" => res.addAll(processInterest(ir.interest, bMap)) // 标签 ID 化 case "country" => if (StringUtils.isNotBlank(ir.country)) { res.add((ir.country.toUpperCase, 1)) } case "age" => if (StringUtils.isNotBlank(ir.age)) { res.add((ir.age, 1)) } case "gender" => if (StringUtils.isNotBlank(ir.gender)) { res.add((ir.gender, 1)) } case _ => null } } res.asScala.iterator } def commonPartition(iter: Iterator[DevTagEntity]): Iterator[(String, Int)] = { val res = new util.ArrayList[(String, Int)]() while (iter.hasNext) { val ir = iter.next /* category match { case "package" => // res.addAll(processInstall(ir.tag)) res.add((ir.tag, 1)) case "interest" => // res.addAll(processInterest(ir.tag, bMap)) // 标签 ID 化 res.add((ir.tag, 1)) case "country" => res.add((ir.tag.toUpperCase, 1)) case "age" => res.add((ir.tag, 1)) case "gender" => res.add((ir.tag, 1)) case _ => null } */ res.add((ir.tag, 1)) } res.asScala.iterator } def filterByPackage(packages: String, pkgs: String): Boolean = { val set = pkgs.split(",").toSet var flag = false val itr = packages.split(",").iterator while (itr.hasNext && !flag) { val rs = itr.next.split("\\|") if (set.contains(rs(0))) { flag = true } } flag } def filterByCountry(country: String, countryCode: String): Boolean = { val set = countryCode.split(",").toSet var flag = false if (set.contains(country)) { flag = true } flag } def filterCommon(column: String, list: String): Boolean = { val set = list.toLowerCase.split(",").toSet var flag = false if (set.contains(column.toLowerCase)) { flag = true } flag } def parseJsonStringPackage(jsonString: String): PackageFilterEntity = { GsonUtil.fromJson(GsonUtil.String2JsonObject(jsonString), classOf[PackageFilterEntity]) } def parseJsonStringAdv(jsonString: String): AdvFilterEntity = { GsonUtil.fromJson(GsonUtil.String2JsonObject(jsonString), classOf[AdvFilterEntity]) } val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") def processQuery(date: String, tag: String, jsonString: String, spark: SparkSession): (RDD[UserInfoEntity], Long, String, Integer) = { var jobId: String = "" var top: Integer = 20 val baseDF = if (tag.equals("pkg")) { spark.udf.register("filterByPackage", Constant.filterByPackage _) spark.udf.register("filterByCountry", Constant.filterByCountry _) val packageFilterEntity: PackageFilterEntity = Constant.parseJsonStringPackage(jsonString) jobId = packageFilterEntity.jobId var end = "" var updateDate = "" if (date.compareTo(packageFilterEntity.end) < 0) { end = date val days = (sdf2.parse(packageFilterEntity.end).getTime - sdf2.parse(packageFilterEntity.start).getTime) / 1000 / 3600 / 24 val update_date = DateUtil.getDayByString(end, "yyyyMMdd", -days.toInt) updateDate = sdf1.format(sdf2.parse(update_date)) } else { end = packageFilterEntity.end updateDate = sdf1.format(sdf2.parse(packageFilterEntity.start)) } var sql = Constant.filter_sql.replace("@date", end).replace("@update_date", updateDate) if (StringUtils.isNotBlank(packageFilterEntity.countryCode)) { sql = sql + s" AND filterByCountry(country,'${packageFilterEntity.countryCode}')" } if (StringUtils.isNotBlank(packageFilterEntity.packageList)) { sql = sql + s" AND filterByPackage(install,'${packageFilterEntity.packageList}')" } if (packageFilterEntity.top != null) { top = packageFilterEntity.top } val df = spark.sql(sql) .dropDuplicates(Seq("device_id")) .rdd .map(r => { UserInfoEntity(r.getAs("device_id"), r.getAs("install"), r.getAs("interest"), r.getAs("country"), r.getAs("age"), r.getAs("gender")) }).persist(StorageLevel.MEMORY_AND_DISK_SER) (df, df.count) } else { spark.udf.register("filterCommon", Constant.filterCommon _) val advFilterEntity: AdvFilterEntity = Constant.parseJsonStringAdv(jsonString) jobId = advFilterEntity.jobId val start = advFilterEntity.start /** * 不填写event相关信息、拉回event信息但未做任何选择的情况下,查install日志,获取安装结果。填写event的情况下,查install日志+event日志,获取安装结果(以uuid/offer_id来join两份日志) * */ var install_sql = adv_install_sql.replace("@start", start).replace("@end", advFilterEntity.end) var event_sql = adv_event_sql.replace("@start", start).replace("@end", advFilterEntity.end) if (StringUtils.isNotBlank(advFilterEntity.offerId)) { install_sql = install_sql + s" AND filterCommon(offer_id,'${advFilterEntity.offerId}')" event_sql = event_sql + s" AND filterCommon(offer_id,'${advFilterEntity.offerId}')" } if (StringUtils.isNotBlank(advFilterEntity.eventName)) { install_sql = install_sql + s" AND filterCommon(id,'${advFilterEntity.eventName}')" event_sql = event_sql + s" AND filterCommon(id,'${advFilterEntity.eventName}')" } if (StringUtils.isNotBlank(advFilterEntity.eventType)) { install_sql = install_sql + s" AND filterCommon(event_type,'${advFilterEntity.eventType}')" event_sql = event_sql + s" AND filterCommon(event_type,'${advFilterEntity.eventType}')" } val adv_df = if (StringUtils.isNotBlank(advFilterEntity.eventName) || StringUtils.isNotBlank(advFilterEntity.eventType)) { spark.sql(install_sql).union(spark.sql(event_sql)).dropDuplicates } else { spark.sql(install_sql).dropDuplicates } adv_df.createOrReplaceTempView("adv") var end = "" var updateDate = "" if (advFilterEntity.end.compareTo(date) <= 0 && advFilterEntity.end.compareTo(DateUtil.getDayByString(date, "yyyyMMdd", -3)) >= 0) { end = advFilterEntity.end updateDate = sdf1.format(sdf2.parse(advFilterEntity.start)) } else { end = date val days = (sdf2.parse(advFilterEntity.end).getTime - sdf2.parse(advFilterEntity.start).getTime) / 1000 / 3600 / 24 val update_date = DateUtil.getDayByString(end, "yyyyMMdd", -days.toInt) updateDate = sdf1.format(sdf2.parse(update_date)) } val sql = Constant.filter_sql.replace("@date", end).replace("@update_date", updateDate) spark.sql(sql).createOrReplaceTempView("user_info") val df = spark.sql(adv_tag_sql) .dropDuplicates(Seq("device_id")) .rdd .map(r => { UserInfoEntity(r.getAs("device_id"), r.getAs("install"), r.getAs("interest"), r.getAs("country"), r.getAs("age"), r.getAs("gender")) }).persist(StorageLevel.MEMORY_AND_DISK_SER) (df, adv_df.count()) } (baseDF._1, baseDF._2, jobId, top) } def processBase(baseDF: RDD[UserInfoEntity], count: Long, jobId: String, top: Integer, spark: SparkSession, bMap: Broadcast[scala.collection.Map[String, (String, String)]]): Dataset[Row] = { val coalesce = 1 val sc = spark.sparkContext // val all = baseDF.count() val allDF = sc.parallelize(Seq(Result(jobId, "all", "all", count.toInt))) import spark.implicits._ // .mapPartitions(distinctPartition(_, bMap, "package")) val packageDF = baseDF .mapPartitions(commonPartition(_, bMap, "package")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .sortBy(_._2, false) .map(r => { Result(jobId, "package", r._1, r._2) }) val packageRDD = sc.parallelize(packageDF.take(top)) // .mapPartitions(distinctPartition(_, bMap, "interest")) val interestDF = baseDF .mapPartitions(commonPartition(_, bMap, "interest")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .sortByKey() .map(r => { Result(jobId, "interest", r._1, r._2) }) // .mapPartitions(distinctPartition(_, bMap, "country")) val countryDF = baseDF .mapPartitions(commonPartition(_, bMap, "country")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .sortBy(_._2, false) .map(r => { Result(jobId, "country", r._1, r._2) }) val countryRDD = sc.parallelize(countryDF.take(top)) // .mapPartitions(distinctPartition(_, bMap, "age")) val ageDF = baseDF .mapPartitions(commonPartition(_, bMap, "age")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .map(r => { Result(jobId, "age", r._1, r._2) }) // .mapPartitions(distinctPartition(_, bMap, "gender")) val genderDF = baseDF .mapPartitions(commonPartition(_, bMap, "gender")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .map(r => { Result(jobId, "gender", r._1, r._2) }) val df = allDF.union(packageRDD).union(interestDF).union(countryRDD).union(ageDF).union(genderDF) .toDF .repartition(1) df } def getGenderId(id: String): String = { val genderId = id match { case "1" => "00001001" case "2" => "00001002" case _ => "00001999" } genderId } def getAgeId(id: String): String = { val ageId = id match { case "1" => "00002001" case "2" => "00002002" case "3" => "00002003" case "4" => "00002004" case "5" => "00002005" case _ => "00002999" } ageId } def writeMySQL(df: Dataset[Row], table: String, saveMode: SaveMode): Unit = { val prop = new java.util.Properties prop.setProperty("driver", "com.mysql.jdbc.Driver") prop.setProperty("user", "apptag_rw") prop.setProperty("password", "7gyLEVtkER3u8c9") prop.setProperty("characterEncoding", "utf8") df.write.mode(saveMode).jdbc("jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/datatory", table, prop) } def jdbcConnection(spark: SparkSession, database: String, table: String): DataFrame = { val url = PropertyUtil.getProperty("config.properties", "mysql.mob_adn.url") val username = PropertyUtil.getProperty("config.properties", "mysql.mob_adn.user") val password = PropertyUtil.getProperty("config.properties", "mysql.mob_adn.password") val properties = new Properties() properties.put("driver", "com.mysql.jdbc.Driver") properties.put("user", username) properties.put("password", password) properties.put("characterEncoding", "utf8") println(s"url ==>> jdbc:mysql://adn-mysql-internal.mobvista.com:3306/${database},user ==>> ${username},password ==>> ${password}") spark.read.jdbc(url = s"jdbc:mysql://${url}:3306/${database}", table = table, properties = properties) } def check_deviceId(device_id: String): Boolean = { StringUtils.isNotBlank(device_id) && (device_id.matches(didPtn) && !device_id.equals(allZero) || device_id.matches(imeiPtn) || device_id.matches(andriodIdPtn)) } }