package mobvista.dmp.common import mobvista.dmp.util.DateUtil import org.apache.spark.sql.{DataFrame, SparkSession} /** * 将天数据合并到安装列表全量数据中 */ class InstallListLogic extends CommonInstallListOrc { /** * 解析天处理结果数据 * * @param spark * @param date * @return */ def processDailyData(business: String, date: String, spark: SparkSession): DataFrame = { val dateTime = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") var sql = "" business match { case "3s" => spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _) sql = MobvistaConstant.tracking_3s_sql.replace("@dt", date) .replace("@update_date", dateTime) case "adn_install" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.adn_install_sql.replace("@dt", date) .replace("@update_date", dateTime) case "adn_request_other" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.adn_request_other_sql.replace("@dt", date) .replace("@update_date", dateTime) case "adn_request_sdk" => spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _) spark.udf.register("parseMExtData", MobvistaConstant.parseMExtData _) sql = MobvistaConstant.adn_reuqest_sdk_sql.replace("@dt", date) .replace("@update_date", dateTime) case "adn_request_unmatch" => spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplit _) spark.udf.register("parseMExtData", MobvistaConstant.parseMExtData _) sql = MobvistaConstant.adn_reuqest_sdk_unmatch_sql.replace("@dt", date) .replace("@update_date", dateTime) case "adn_sdk" => spark.udf.register("toJsonString", MobvistaConstant.toJsonString _) sql = MobvistaConstant.adn_sdk_sql.replace("@dt", date).replace("@version", "0") .replace("@update_date", dateTime) case "adn_sdk_v2" => spark.udf.register("toJsonString", MobvistaConstant.toJsonString _) sql = MobvistaConstant.adn_sdk_sql.replace("@dt", date).replace("@version", "1") .replace("@update_date", dateTime) case "ali" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.ali_sql.replace("@dt", date) .replace("@update_date", dateTime) case "allpb" => spark.udf.register("toJsonBySplit", MobvistaConstant.toJsonBySplitV2 _) sql = MobvistaConstant.allpb_sql.replace("@dt", date) .replace("@update_date", dateTime) case "bytedance" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.bytedance_sql.replace("@dt", dateTime) .replace("@update_date", dateTime) case "clever" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.clever_sql.replace("@dt", date) .replace("@update_date", dateTime) case "dsp_req" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) spark.udf.register("parseDSPExtData", MobvistaConstant.parseDSPExtData _) sql = MobvistaConstant.dsp_req_sql.replace("@dt", dateTime) .replace("@update_date", dateTime) case "dsp_req_unmatch" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) spark.udf.register("parseDSPExtData", MobvistaConstant.parseDSPExtData _) sql = MobvistaConstant.dsp_req_unmatch_sql.replace("@dt", date) .replace("@update_date", dateTime) case "facebook" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.facebook_sql.replace("@dt", date) .replace("@update_date", dateTime) case "ga" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.ga_sql.replace("@dt", date) .replace("@update_date", dateTime) case "joypacios" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.joypacios_sql.replace("@dt", dateTime) .replace("@update_date", dateTime) case "mp" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.mp_sql.replace("@dt", date) .replace("@update_date", dateTime) case "iqiyi_api" => spark.udf.register("toPkgJson", MobvistaConstant.toPkgJson _) sql = MobvistaConstant.iqiyi_sql.replace("@dt", date) .replace("@update_date", dateTime) case "other" => // 因上游停止传数,故不再进行查询,造非法数据以便过滤 sql = """ |SELECT * FROM (SELECT '' device_id, '' device_type, '' platform, '' country, '' ext_data, '' install_list) t WHERE check_device(device_id) |""".stripMargin } spark.sql(sql) } } object InstallListLogic { def main(args: Array[String]): Unit = { new InstallListLogic().run(args) } }