package mobvista.dmp.datasource.fmp import java.text.SimpleDateFormat import com.google.gson.{JsonArray, JsonObject} import mobvista.dmp.datasource.dm.Constant.{allZero, didPtn} import mobvista.dmp.util.DateUtil import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang3.StringUtils import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import scala.collection.JavaConversions._ /** * @package: mobvista.dmp.datasource.fmp * @author: wangjf * @date: 2019-06-10 * @time: 16:57 * @emial: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ object Constant { val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") def parseJsonStringJob(jsonString: String): JobProfileEntity = { GsonUtil.fromJson(GsonUtil.String2JsonObject(jsonString), classOf[JobProfileEntity]) } def parseDimension(json: JsonObject): DimensionEntity = { GsonUtil.fromJson(json, classOf[DimensionEntity]) } def getDF(spark: SparkSession, jobProfileEntity: JobProfileEntity): (DataFrame, Int) = { import spark.implicits._ var user_sql = """ |SHOW PARTITIONS dwh.dm_user_info """.stripMargin var partDF = spark.sql(user_sql) var date = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1) user_sql = s""" |SELECT UPPER(device_id) device_id, frequency FROM dwh.dm_user_info WHERE dt = '${date}' """.stripMargin if (jobProfileEntity.last_req_day != null) { val updateDate = DateUtil.getDayByString(date, "yyyyMMdd", -jobProfileEntity.last_req_day) val update_date = sdf1.format(sdf2.parse(updateDate)) user_sql = user_sql + s" AND update_date >= '${update_date}'" } if (StringUtils.isNotBlank(jobProfileEntity.behavior) && GsonUtil.String2JsonArray(jobProfileEntity.behavior).size() > 0) { var behavior = "" GsonUtil.String2JsonArray(jobProfileEntity.behavior).foreach(j => { behavior += "'" + j.getAsString.toLowerCase + "'," }) behavior = "(" + behavior.substring(0, behavior.length - 1) + ")" user_sql = user_sql + s" AND LOWER(behavior) IN ${behavior}" } var active_sql = """ |SHOW PARTITIONS dwh.dm_active_tag """.stripMargin partDF = spark.sql(active_sql) date = partDF.orderBy(partDF("partition").desc).take(2)(1).getString(0).split("/")(0).split("=")(1) val part = partDF.orderBy(partDF("partition").desc).take(2)(1).getString(0).split("/")(1).split("=")(1) /* if (!part.equals("month")) { date = DateUtil.getDayByString(date, "yyyyMMdd", -1) } */ active_sql = s""" |SELECT UPPER(device_id) device_id, tags, part FROM dwh.dm_active_tag WHERE dt = '${date}' """.stripMargin if (StringUtils.isNotBlank(jobProfileEntity.country) && GsonUtil.String2JsonArray(jobProfileEntity.country).size() > 0) { var country = "" GsonUtil.String2JsonArray(jobProfileEntity.country).foreach(j => { country += "'" + j.getAsString.toUpperCase + "'," }) country = "(" + country.substring(0, country.length - 1) + ")" user_sql = user_sql + s" AND UPPER(country) IN $country" active_sql = active_sql + s" AND UPPER(country_code) IN $country" } if (StringUtils.isNotBlank(jobProfileEntity.platform) && GsonUtil.String2JsonArray(jobProfileEntity.platform).size() > 0) { var platform = "" // var platform_ids = "" GsonUtil.String2JsonArray(jobProfileEntity.platform).foreach(j => { platform += "'" + j.getAsString.toUpperCase + "'," // platform_ids += "'" + (if (j.getAsString.toUpperCase.equals("IOS")) "0" else "1") + "'," }) platform = "(" + platform.substring(0, platform.length - 1) + ")" // platform_ids = "(" + platform_ids.substring(0, platform_ids.length - 1) + ")" user_sql = user_sql + s" AND UPPER(platform) IN $platform" // active_sql = active_sql + s" AND UPPER(platform) IN $platform_ids" } if (StringUtils.isNotBlank(jobProfileEntity.package_name) && GsonUtil.String2JsonArray(jobProfileEntity.package_name).size() > 0) { var package_sql = """ |SHOW PARTITIONS dwh.package_mapping """.stripMargin partDF = spark.sql(package_sql) date = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1) package_sql = s""" |SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${date}' """.stripMargin val bMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => { (r.getAs("package_name").toString.toUpperCase, r.getAs("id").toString) }).collectAsMap()) spark.udf.register("filterByPackage", filterByPackage _) var package_name = "" GsonUtil.String2JsonArray(jobProfileEntity.package_name).foreach(j => { package_name += "" + bMap.value.getOrElse(j.getAsString.toUpperCase, "-1") + "," }) package_name = package_name.substring(0, package_name.length - 1) user_sql = user_sql + s" AND filterByPackage(CONCAT_WS(',',install),'${package_name}')" } spark.udf.register("check_deviceId", check_deviceId _) user_sql = user_sql + " AND @check_deviceId" println("user_sql ==>> " + user_sql) val user_df = spark.sql(user_sql.replace("@check_deviceId", "check_deviceId(device_id)")) // .rdd.map(r => {DeviceFrequencyEntity(r.getAs("device_id"), r.getAs("frequency"))}) // .persist(StorageLevel.MEMORY_AND_DISK_SER) active_sql = active_sql + " AND @check_deviceId" println("active_sql ==>> " + active_sql) val active_df = spark.sql(active_sql.replace("@check_deviceId", "check_deviceId(device_id)")) // .rdd.map(r => {DeviceActiveEntity(r.getAs("device_id"), r.getAs("tags"), r.getAs("part"))}) val df = if (StringUtils.isNotBlank(jobProfileEntity.dimension) && GsonUtil.String2JsonArray(jobProfileEntity.dimension).size() > 0) { val code_sql: String = """ |SELECT new_second_id, tag_id FROM | dwh.dm_old2new_tag """.stripMargin val bMap = spark.sparkContext.broadcast(spark.sql(code_sql).rdd.map(r => { (r.getAs("new_second_id").toString, r.getAs("tag_id").toString) }).collectAsMap()) val dimensionEntity = parseDimension(GsonUtil.String2JsonArray(jobProfileEntity.dimension).get(0).getAsJsonArray.get(0).getAsJsonObject) val part = dimensionEntity.active_dimension.toLowerCase val tmp_df = active_df.toDF.filter(s"part = '${part}'").select("device_id", "tags") tmp_df.createOrReplaceTempView("active") user_df.toDF.createOrReplaceTempView("user") println("user_df.count ==>> " + user_df.count()) println("tmp_df.count ==>> " + tmp_df.count()) val join_sql = """ |SELECT a.device_id device_id, a.frequency frequency, b.tags tags | FROM user a JOIN active b | ON a.device_id = b.device_id """.stripMargin val join_df = spark.sql(join_sql) .rdd.map(r => { DeviceCntCount(r.getAs("device_id"), r.getAs("frequency"), r.getAs("tags")) }).persist(StorageLevel.MEMORY_ONLY_SER) println("join_df.count ==>> " + join_df.count()) val rdd = dimensionForeach(join_df, GsonUtil.String2JsonArray(jobProfileEntity.dimension), bMap, spark).dropDuplicates rdd } else { val user_rdd = user_df.map(r => { r.getAs("device_id").toString }).toDF user_rdd.dropDuplicates } df.cache val limit: Int = if (jobProfileEntity.limit != null && jobProfileEntity.limit <= 10000000) { jobProfileEntity.limit } else { 10000000 } (df.limit(limit), df.count.toInt) } def check_deviceId(device_id: String): Boolean = { device_id.matches(didPtn) && !device_id.equals(allZero) } // UDF def filterByPackage(packages: String, pkgs: String): Boolean = { val set = pkgs.split(",").toSet var flag = false val itr = packages.split(",").iterator // val itr = packages.iterator while (itr.hasNext && !flag) { val rs = itr.next if (set.contains(rs)) { flag = true } } flag } // dimension 一致优化 def dimensionForeach(df: RDD[DeviceCntCount], jsonArray: JsonArray, bMap: Broadcast[scala.collection.Map[String, String]], spark: SparkSession): DataFrame = { import spark.implicits._ var outer_df = spark.emptyDataFrame jsonArray.foreach(json => { var inter_df = spark.emptyDataFrame json.getAsJsonArray.foreach(j => { val dimensionEntity = parseDimension(j.getAsJsonObject) // val interest = bMap.value.getOrElse(dimensionEntity.interest, "0") val install_cnt = dimensionEntity.install_cnt val active_count = dimensionEntity.active_count inter_df = if (inter_df.rdd.isEmpty()) { parseCnt_Count(df, dimensionEntity.interest, bMap, install_cnt, active_count).toDF } else { inter_df.union(parseCnt_Count(df, dimensionEntity.interest, bMap, install_cnt, active_count).toDF) } }) outer_df = if (outer_df.rdd.isEmpty()) { inter_df } else { outer_df.intersect(inter_df) } }) outer_df } // dimension 计算 def dimensionForeach(user_df: RDD[DeviceFrequencyEntity], active_df: RDD[DeviceActiveEntity], jsonArray: JsonArray, bMap: Broadcast[scala.collection.Map[String, String]], spark: SparkSession): DataFrame = { import spark.implicits._ var outer_df = spark.emptyDataFrame jsonArray.foreach(json => { var inter_df = spark.emptyDataFrame json.getAsJsonArray.foreach(j => { val dimensionEntity = parseDimension(j.getAsJsonObject) val interest = bMap.value.getOrElse(dimensionEntity.interest, "0") val install_cnt = dimensionEntity.install_cnt val active_count = dimensionEntity.active_count val active_dimension = dimensionEntity.active_dimension inter_df = if (inter_df.rdd.isEmpty()) { parseInterestCnt(user_df, interest, install_cnt).toDF.intersect(parseInterestActive(active_df, dimensionEntity.interest, active_count, active_dimension).toDF) } else { inter_df.union(parseInterestCnt(user_df, interest, install_cnt).toDF.intersect(parseInterestActive(active_df, dimensionEntity.interest, active_count, active_dimension).toDF)) } }) outer_df = if (outer_df.rdd.isEmpty()) { inter_df } else { outer_df.intersect(inter_df) } }) outer_df } // install_cnt 安装频度 def parseCnt_Count(df: RDD[DeviceCntCount], interest: String, bMap: Broadcast[scala.collection.Map[String, String]], install_cnt: String, active_count: String): RDD[String] = { val cnter = if (install_cnt.split("-").size == 2) { (Integer.parseInt(install_cnt.split("-")(0)), Integer.parseInt(install_cnt.split("-")(1))) } else { (Integer.parseInt(install_cnt.split("-")(0)), -1) } val active_cnter = if (active_count.split("-").size == 2) { (Integer.parseInt(active_count.split("-")(0)), Integer.parseInt(active_count.split("-")(1))) } else { (Integer.parseInt(active_count.split("-")(0)), -1) } // val query_tag = bMap.value.getOrElse(interest, "0") val result = df.filter(d => { var freq_flag = false if (d.frequency != null) { for (i <- d.frequency.indices if !freq_flag) { val tag = d.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("tag").toString val cnt = Integer.parseInt(d.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("cnt").toString) freq_flag = tag.equals(interest) && (if (cnter._2 == -1) cnt >= cnter._1 else cnt >= cnter._1 && cnt <= cnter._2) } } var active_flag = false val tags = GsonUtil.String2JsonArray(d.tags) for (tag <- tags if !active_flag) { val active = GsonUtil.fromJson(tag.getAsJsonObject, classOf[ActiveTagEntity]) val tag_id = active.tag_id val cnt = active.cnt active_flag = tag_id.equals(interest) && (if (active_cnter._2 == -1) cnt >= active_cnter._1 else cnt >= active_cnter._1 && cnt <= active_cnter._2) } freq_flag && active_flag }).map(r => { r.deviceId }) result } // install_cnt 安装频度 def parseInterestCnt(df: RDD[DeviceFrequencyEntity], interest: String, install_cnt: String): RDD[String] = { val cnter = if (install_cnt.split("-").size == 2) { (Integer.parseInt(install_cnt.split("-")(0)), Integer.parseInt(install_cnt.split("-")(1))) } else { (Integer.parseInt(install_cnt.split("-")(0)), -1) } val result = df.filter(d => { var flag = false if (d.frequency != null) { for (i <- d.frequency.indices if !flag) { val tag = d.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("tag").toString val cnt = Integer.parseInt(d.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("cnt").toString) flag = tag.equals(interest) && (if (cnter._2 == -1) cnt >= cnter._1 else cnt >= cnter._1 && cnt <= cnter._2) } } flag }).map(r => { r.device_id }) result } // active_count 活跃天数 def parseInterestActive(df: RDD[DeviceActiveEntity], interest: String, active_count: String, active_dimension: String): RDD[String] = { val cnter = if (active_count.split("-").size == 2) { (Integer.parseInt(active_count.split("-")(0)), Integer.parseInt(active_count.split("-")(1))) } else { (Integer.parseInt(active_count.split("-")(0)), -1) } val result = df.filter(d => { d.part.equals(active_dimension) }).filter(d => { var flag = false val tags = GsonUtil.String2JsonArray(d.tags) val part = d.part for (tag <- tags if !flag) { val active = GsonUtil.fromJson(tag.getAsJsonObject, classOf[ActiveTagEntity]) val tag_id = active.tag_id val cnt = active.cnt flag = tag_id.equals(interest) && (if (cnter._2 == -1) cnt >= cnter._1 else cnt >= cnter._1 && cnt <= cnter._2) } flag }).map(r => { r.device_id }) result } }