DmpDeviceInterest.scala 9.08 KB
package mobvista.dmp.datasource.dm

import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.dm.Constant.DmInterestTagV2
import org.apache.commons.cli.Options
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

import java.net.URI
import scala.collection.JavaConverters._
import scala.collection.mutable

/**
  * @package: mobvista.dmp.datasource.dm
  * @author: wangjf
  * @date: 2018/12/13
  * @time: 下午6:42
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class DmpDeviceInterest extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      printOptions(commandLine)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
    val output = commandLine.getOptionValue("output")

    val spark = SparkSession.builder()
      .appName(name = s"DmpDeviceInterest.$date")
      .config("spark.rdd.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.io.compression.lz4.blockSize", "64k")
      .config("spark.broadcast.compress", "true")
      .config("spark.sql.autoBroadcastJoinThreshold", "524288000")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
    try {
      /*
      val tag_sql = Constant.dim_app_tag_sql.replace("@date", date)
      val oldMap = sc.broadcast(spark.sql(tag_sql).rdd.map(r => {
        (r.getAs("package_name").toString.toLowerCase, r.getAs("tag").toString)
      }).collectAsMap())
      */

      import spark.implicits._
      val app_two_tags_sql = InterestTagConstant.app_tag_except_sql.replace("@date", date)
      spark.sql(app_two_tags_sql)
        .flatMap(InterestTagConstant.flatInterest)
        .toDF()
        .createOrReplaceTempView("t_app_two_tags")

      spark.udf.register("str2Json", str2Json _)
      spark.udf.register("mergeInstallList", mergeInstallList _)

      val new_tag_sql = InterestTagConstant.tag_sql_v2

      //  import scala.collection.JavaConversions._
      val newMap = sc.broadcast(spark.sql(new_tag_sql).rdd.map(r => {

        var packageName = r.getAs("package_name").toString.toLowerCase()
        if (packageName.matches("^id\\d+$")) {
          packageName = packageName.replace("id", "")
        }
        (packageName, r.getAs("tags").toString)
        //  (r.getAs("package_name").toString.toLowerCase(), r.getAs[mutable.WrappedArray[String]]("tag_ids").asJava)
      }).collectAsMap())

      //  newMap.value.foreach(println)
      //  val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -15), "yyyy-MM-dd")

      //  val before_date = DateUtil.getDayByString(date, "yyyyMMdd", -1)
      //  val before_update_date = DateUtil.format(DateUtil.getDay(before_date, "yyyyMMdd", -15), "yyyy-MM-dd")

      //  val udf_mergeInstallList = udf(mergeInstallList _)
      //  val udf_mergeExtData = udf(Constant.mergeExtData _)
      /*
      val input = "s3://mob-emr-test/wangjf/data/install_list_orc/2020/04/23/adn_request_sdk"

      val df = spark.read.orc(input).groupBy(lower(col("device_id")).alias("device_id"))
        .agg(max(col("device_type")).alias("device_type"),
          max(col("platform")).alias("platform"),
          udf_mergeInstallList(collect_set(col("install_list"))).alias("install_list"),
          udf_mergeExtData(collect_set(concat_ws("#", col("ext_data"), col("update_date"), lit("m")))).alias("ext_data"),
          max(col("update_date")).alias("update_date")
        )
      */

      val rdd = spark.sql(Constant.install_sql.replace("@date", date)).rdd
        .map(row => {
          val device_id = row.getAs("device_id").toString
          val device_type = row.getAs("device_type").toString
          val platform = if (StringUtils.isNotBlank(row.getAs("platform").toString) && "ios".equals(row.getAs("platform").toString)) {
            "ios"
          } else {
            "android"
          }
          val ext_data = row.getAs[String]("ext_data")
          val update_date = row.getAs[String]("update_date")
          import scala.collection.JavaConverters._
          val install_list = JSON.parse(row.getAs("install_list").toString).asInstanceOf[java.util.Map[String, String]].asScala
          val jsonArray = new JSONArray()
          install_list.foreach(install => {
            val json = new JSONObject()
            var package_name = install._1
            val install_date = install._2
            if (package_name.matches("^id\\d+$")) {
              package_name = package_name.replace("id", "")
            }
            json.put("package_name", package_name)
            json.put("date", install_date)
            if (newMap.value.keySet.contains(package_name.toLowerCase)) {
              val newTags = newMap.value(package_name.toLowerCase)
              json.put("tag_new", JSON.parseArray(newTags))
            }
            if (json.containsKey("tag_new") && !package_name.toLowerCase.equals("0000000000") && !package_name.equals("com.nonepkg.nonepkg")) {
              jsonArray.add(json)
            }
          })
          /*
          if (jsonArray.size() > 0) {
            DmInterestTagV2(device_id, device_type, platform, jsonArray.toString, ext_data, update_date)
          } else {
            null
          }
          */
          DmInterestTagV2(device_id = device_id, device_type = device_type, platform = platform, install = install_list.asJava.toString,
            tags = jsonArray.toString, ext_data = ext_data, update_date = update_date)
        })

      rdd.repartition(coalesce).toDF()
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output)

      //  .mapPartitions(v => new CustomIterator(v, mapper = mapper, oldMap = app_tag_map, newMap = bMap))
      /*
      .map(row => {
        val device_id = row.getAs("device_id").toString
        val device_type = row.getAs("device_type").toString
        val platform = row.getAs("platform").toString
        val install_list = row.getAs("install_list").toString
        val jsonArray = new JsonArray
        val jsonNode = mapper.readTree(install_list).elements()
        while (jsonNode.hasNext) {
          val node = jsonNode.next()
          val date = node.path("date").asText()
          val package_name =
            if (platform.equals("ios") && node.path("package_name").asText().startsWith("id")) {
              node.path("package_name").asText().substring(2)
            } else {
              node.path("package_name").asText()
            }
          if (app_tag_map.value.keySet.contains(package_name)) {
            val tags = app_tag_map.value(package_name)
            val json = new JsonObject
            json.addProperty("package_name", package_name)
            json.addProperty("date", date)
            json.add("tag", GsonUtil.String2JsonArray(tags))
            jsonArray.add(json)
          }
        }
        val dmInterestTag = if (jsonArray.size() > 0) {
          DmInterestTag(device_id, device_type, platform, jsonArray.toString)
        } else {
          null
        }
        dmInterestTag
      }).filter(_ != null)
      */

    } finally {
      if (spark != null) {
        sc.stop()
        spark.stop()
      }
    }
    0
  }

  def str2Json(tag_id: String, first_tag: String, second_tag: String): String = {
    val jsonObject = new JSONObject()
    jsonObject.put("id", tag_id)
    jsonObject.put("1", first_tag)
    if (StringUtils.isNotBlank(second_tag)) {
      jsonObject.put("2", second_tag)
    }
    jsonObject.toString
  }

  def mergeInstallList(installList: mutable.WrappedArray[String]) = {
    val installJSONObject = new JSONObject()
    installList.iterator.foreach(install => {
      val installMap = JSON.parse(install).asInstanceOf[java.util.Map[String, String]].asScala
      installMap.foreach(packageInfo => {
        if (installJSONObject.containsKey(packageInfo._1)) {
          if (installJSONObject.getString(packageInfo._1).compareTo(packageInfo._2) < 0) {
            installJSONObject.put(packageInfo._1, packageInfo._2)
          }
        } else {
          installJSONObject.put(packageInfo._1, packageInfo._2)
        }
      })
    })
    installJSONObject.toJSONString
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output", true, "[must] output")
    options
  }
}

object DmpDeviceInterest {
  def main(args: Array[String]): Unit = {
    new DmpDeviceInterest().run(args)
  }
}