InstallListLogic.scala 5.03 KB
package mobvista.dmp.common

import mobvista.dmp.util.DateUtil
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * 将天数据合并到安装列表全量数据中
  */
class InstallListLogic extends CommonInstallListOrc {

  /**
    * 解析天处理结果数据
    *
    * @param spark
    * @param date
    * @return
    */

  def processDailyData(business: String, date: String, spark: SparkSession): DataFrame = {
    val dateTime = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")
    var sql = ""
    business match {
      case "3s" =>
        spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _)
        sql = MobvistaConstant.tracking_3s_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_install" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.adn_install_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_request_other" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.adn_request_other_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_request_sdk" =>
        spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _)
        spark.udf.register("parseMExtData", MobvistaConstant.parseMExtData _)
        sql = MobvistaConstant.adn_reuqest_sdk_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_request_unmatch" =>
        spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _)
        spark.udf.register("parseMExtData", MobvistaConstant.parseMExtData _)
        sql = MobvistaConstant.adn_reuqest_sdk_unmatch_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_sdk" =>
        spark.udf.register("toJsonString", MobvistaConstant.toJsonString _)
        sql = MobvistaConstant.adn_sdk_sql.replace("@dt", date).replace("@version", "0")
          .replace("@update_date", dateTime)
      case "adn_sdk_v2" =>
        spark.udf.register("toJsonString", MobvistaConstant.toJsonString _)
        sql = MobvistaConstant.adn_sdk_sql.replace("@dt", date).replace("@version", "1")
          .replace("@update_date", dateTime)
      case "ali" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.ali_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "allpb" =>
        spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplitV2 _)
        sql = MobvistaConstant.allpb_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "bytedance" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.bytedance_sql.replace("@dt", dateTime)
          .replace("@update_date", dateTime)
      case "clever" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.clever_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "dsp_req" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        spark.udf.register("parseDSPExtData", MobvistaConstant.parseDSPExtData _)
        sql = MobvistaConstant.dsp_req_sql.replace("@dt", dateTime)
          .replace("@update_date", dateTime)
      case "dsp_req_unmatch" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        spark.udf.register("parseDSPExtData", MobvistaConstant.parseDSPExtData _)
        sql = MobvistaConstant.dsp_req_unmatch_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "facebook" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.facebook_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "ga" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.ga_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "joypacios" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.joypacios_sql.replace("@dt", dateTime)
          .replace("@update_date", dateTime)
      case "mp" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.mp_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "iqiyi_api" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.iqiyi_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "other" => //  因上游停止传数,故不再进行查询,造非法数据以便过滤
        sql =
          """
            |SELECT * FROM (SELECT '' device_id, '' device_type, '' platform, '' country, '' ext_data, '' install_list) t WHERE check_device(device_id)
            |""".stripMargin
    }
    spark.sql(sql)
  }
}

object InstallListLogic {
  def main(args: Array[String]): Unit = {
    new InstallListLogic().run(args)
  }
}