InstallListLogic.scala 5.03 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
package mobvista.dmp.common

import mobvista.dmp.util.DateUtil
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * 将天数据合并到安装列表全量数据中
  */
class InstallListLogic extends CommonInstallListOrc {

  /**
    * 解析天处理结果数据
    *
    * @param spark
    * @param date
    * @return
    */

  def processDailyData(business: String, date: String, spark: SparkSession): DataFrame = {
    val dateTime = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")
    var sql = ""
    business match {
      case "3s" =>
        spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _)
        sql = MobvistaConstant.tracking_3s_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_install" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.adn_install_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_request_other" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.adn_request_other_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_request_sdk" =>
        spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _)
        spark.udf.register("parseMExtData", MobvistaConstant.parseMExtData _)
        sql = MobvistaConstant.adn_reuqest_sdk_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_request_unmatch" =>
        spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _)
        spark.udf.register("parseMExtData", MobvistaConstant.parseMExtData _)
        sql = MobvistaConstant.adn_reuqest_sdk_unmatch_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "adn_sdk" =>
        spark.udf.register("toJsonString", MobvistaConstant.toJsonString _)
        sql = MobvistaConstant.adn_sdk_sql.replace("@dt", date).replace("@version", "0")
          .replace("@update_date", dateTime)
      case "adn_sdk_v2" =>
        spark.udf.register("toJsonString", MobvistaConstant.toJsonString _)
        sql = MobvistaConstant.adn_sdk_sql.replace("@dt", date).replace("@version", "1")
          .replace("@update_date", dateTime)
      case "ali" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.ali_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "allpb" =>
        spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplitV2 _)
        sql = MobvistaConstant.allpb_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "bytedance" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.bytedance_sql.replace("@dt", dateTime)
          .replace("@update_date", dateTime)
      case "clever" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.clever_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "dsp_req" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        spark.udf.register("parseDSPExtData", MobvistaConstant.parseDSPExtData _)
        sql = MobvistaConstant.dsp_req_sql.replace("@dt", dateTime)
          .replace("@update_date", dateTime)
      case "dsp_req_unmatch" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        spark.udf.register("parseDSPExtData", MobvistaConstant.parseDSPExtData _)
        sql = MobvistaConstant.dsp_req_unmatch_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "facebook" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.facebook_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "ga" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.ga_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "joypacios" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.joypacios_sql.replace("@dt", dateTime)
          .replace("@update_date", dateTime)
      case "mp" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.mp_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "iqiyi_api" =>
        spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _)
        sql = MobvistaConstant.iqiyi_sql.replace("@dt", date)
          .replace("@update_date", dateTime)
      case "other" => //  因上游停止传数,故不再进行查询,造非法数据以便过滤
        sql =
          """
            |SELECT * FROM (SELECT '' device_id, '' device_type, '' platform, '' country, '' ext_data, '' install_list) t WHERE check_device(device_id)
            |""".stripMargin
    }
    spark.sql(sql)
  }
}

object InstallListLogic {
  def main(args: Array[String]): Unit = {
    new InstallListLogic().run(args)
  }
}