package mobvista.dmp.common import com.alibaba.fastjson.JSON import mobvista.dmp.common.MobvistaConstant.deviceTypeSet import mobvista.dmp.util.DateUtil import org.apache.commons.cli.Options import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.joda.time.format.DateTimeFormat import java.net.URI import java.util import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ /** * 填数据合并到安装列表通用程序, * 记成该类后,需实现数据处理方法即可 * fengliang */ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return 1 } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val business = commandLine.getOptionValue("business") val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val output = commandLine.getOptionValue("output") val dateTime = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") val expireDate = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(dateTime).minusMonths(12).toString("yyyy-MM-dd") val spark = MobvistaConstant.createSparkSession(s"DmpInstallList.$business.$date") val v2_flag = if (business.equals("adn_sdk_v2")) { true } else { false } try { spark.udf.register("check_device", MobvistaConstant.checkDeviceId _) spark.udf.register("filter_pkg", MobvistaConstant.filter_pkg _) spark.udf.register("get_filter_pkg", MobvistaConstant.get_filter_pkg _) val sc = spark.sparkContext // 任务重试过程中,路径已经存在造成无法写入的bug FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) val dailyRDD = processDailyData(business, date, spark).rdd.map(row => { val device_id = row.getAs("device_id").toString val device_type = if (row.getAs("device_type") != null && StringUtils.isNotBlank(row.getAs("device_type"))) { row.getAs("device_type").toString } else { "" } val platform = if (row.getAs("platform") != null && StringUtils.isNotBlank(row.getAs("platform"))) { row.getAs("platform").toString } else { "" } var country = row.getAs[String]("country").toUpperCase if (!country.matches(MobvistaConstant.countryPtn)) { country = "" } val ext_data = row.getAs[String]("ext_data") val install_list = row.getAs("install_list").toString ((device_id, device_type, platform), (install_list, ext_data, country)) }) val last_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -1), "yyyyMMdd") val install_list_sql = MobvistaConstant.dmp_install_list_sql.replace("@dt", last_date) .replace("@business", business).replace("@expireDate", expireDate) // .mapPartitions(v => new CustomInteratorAll(v)) val installRDD = spark.sql(install_list_sql) .rdd .map(row => { val device_id = row.getAs("device_id").toString val device_type = row.getAs("device_type").toString val platform = row.getAs("platform").toString val country = if (row.getAs("country") != null && StringUtils.isNotBlank(row.getAs[String]("country")) && row.getAs[String]("country").toUpperCase.matches(MobvistaConstant.countryPtn)) { row.getAs("country").toString.toUpperCase } else { "" } val install_list = row.getAs("install_list").toString val ext_data = row.getAs("ext_data").toString val update_date = row.getAs("update_date").toString // new table ((device_id, device_type, platform), (install_list, ext_data, update_date, country)) }) import spark.implicits._ // .mapPartitions(v => new CustomInteratorList(dateTime, v)) val df = if (business.equalsIgnoreCase("dsp_req_unmatch")) { dailyRDD.map(tp => { val dailyK = tp._1 // val keys = MRUtils.SPLITTER.split(tp._1) val pkgs = tp._2._1 val ext_data = tp._2._2 val country = tp._2._3 val updateDate = dateTime var deviceType = dailyK._2 if (deviceType.equalsIgnoreCase("android_id") || deviceType.equalsIgnoreCase("androidid")) { deviceType = "androidid" } var platform = dailyK._3 if (platform.equalsIgnoreCase("android") || platform.equalsIgnoreCase("android2") || platform.equalsIgnoreCase("adr")) { platform = "android" } else if (platform.equalsIgnoreCase("ios") || platform.equalsIgnoreCase("ios2")) { platform = "ios" } DmpInstallList(dailyK._1, deviceType, platform, country, pkgs, ext_data, updateDate) }) } else { dailyRDD.fullOuterJoin(installRDD) .map(tuple => { val key = tuple._1 val valTuple = tuple._2 val dailyOpt = valTuple._1 val totalOpt = valTuple._2 var pkgs = "" var ext_data = "" var country = "" var updateDate = "" if (dailyOpt.isEmpty && totalOpt.isDefined) { val total = totalOpt.get // val installListDate = MRUtils.SPLITTER.split(totalOpt.get, -1) pkgs = total._1 val old_ext_data_json = JSON.parseObject(total._2) val region_list = if (old_ext_data_json.containsKey("region")) { JSON.parseArray(old_ext_data_json.getString("region"), classOf[String]) } else { new util.ArrayList[String]() } old_ext_data_json.put("region", new util.HashSet(region_list)) ext_data = old_ext_data_json.toJSONString updateDate = total._3 country = total._4 } else if (dailyOpt.isDefined && totalOpt.isEmpty) { pkgs = dailyOpt.get._1 ext_data = dailyOpt.get._2 country = dailyOpt.get._3 updateDate = dateTime } else if (dailyOpt.isDefined && totalOpt.isDefined) { // 优先加入最近活跃的 package_name val installJson = JSON.parseObject(dailyOpt.get._1).asInstanceOf[java.util.Map[String, String]] // 删除过期的安装信息,并对安装时间进行排序,优先入库最近活跃的安装包 // val array = MRUtils.SPLITTER.split(totalOpt.get, -1) val total = totalOpt.get val installMap = JSON.parseObject(total._1).asInstanceOf[java.util.Map[String, String]].asScala.retain((k, _) => !installJson.contains(k)) if (v2_flag) { // adn 上报业务 新增安装包处理逻辑;该逻辑不会影响其他业务线数据处理logic installJson.keySet().foreach(k => { if (StringUtils.isNotBlank(k)) { if (k.endsWith(".notinstall")) { // 去掉 包名.delete 和 包名 val packageNameDel1 = k.replace(".notinstall", "") // 包名 val packageNameDel2 = packageNameDel1 + ".delete" // 包名.delete installMap.remove(packageNameDel1) installMap.remove(packageNameDel2) } else if (k.endsWith(".delete")) { // 去掉 包名.notinstall 和 包名 val packageNameDel1 = k.replace(".delete", "") // 包名 val packageNameDel2 = packageNameDel1 + ".notinstall" // 包名.notinstall installMap.remove(packageNameDel1) installMap.remove(packageNameDel2) } else { // 不以 .notinstall .delete 为后缀,要去掉 包名.notinstall 和 包名.delete val packageNameDel1 = k + ".notinstall" // 包名.notinstall val packageNameDel2 = k + ".delete" // 包名.delete installMap.remove(packageNameDel1) installMap.remove(packageNameDel2) } } }) } if (installMap.size + installJson.size() > 1000) { installMap.toList.sortWith(_._2 > _._2).foreach(kv => { if (installJson.size() < 1000) { installJson.put(kv._1, kv._2) } }) } else { installJson.putAll(installMap.asJava) } pkgs = installJson.toString val old_ext_data_json = JSON.parseObject(total._2) val daily_ext_data_json = JSON.parseObject(dailyOpt.get._2) if (daily_ext_data_json.containsKey("dev_tag") && daily_ext_data_json.getInteger("dev_tag") == 1) { old_ext_data_json.put("dev_tag", daily_ext_data_json.getInteger("dev_tag")) } val ext_data_list = if (old_ext_data_json.containsKey("strategy")) { JSON.parseArray(old_ext_data_json.getString("strategy"), classOf[String]) } else { new util.ArrayList[String]() } if (daily_ext_data_json.containsKey("strategy")) { ext_data_list.addAll(JSON.parseArray(daily_ext_data_json.getString("strategy"), classOf[String])) old_ext_data_json.put("strategy", new util.HashSet(ext_data_list)) } val region_list = if (old_ext_data_json.containsKey("region")) { JSON.parseArray(old_ext_data_json.getString("region"), classOf[String]) } else { new util.ArrayList[String]() } if (daily_ext_data_json.containsKey("region")) { region_list.addAll(JSON.parseArray(daily_ext_data_json.getString("region"), classOf[String])) old_ext_data_json.put("region", new util.HashSet(region_list)) } ext_data = old_ext_data_json.toJSONString country = if (StringUtils.isNotBlank(dailyOpt.get._3)) { dailyOpt.get._3 } else { total._4 } updateDate = dateTime } var deviceType = key._2 if (deviceType.equalsIgnoreCase("android_id") || deviceType.equalsIgnoreCase("androidid")) { deviceType = "androidid" } var platform = key._3 if (platform.equalsIgnoreCase("android") || platform.equalsIgnoreCase("android2") || platform.equalsIgnoreCase("adr")) { platform = "android" } else if (platform.equalsIgnoreCase("ios") || platform.equalsIgnoreCase("ios2")) { platform = "ios" } DmpInstallList(key._1, deviceType, platform, country, pkgs, ext_data, updateDate) }) } df.filter(r => { deviceTypeSet.contains(r.device_type) }).toDF .repartition(coalesce) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } override protected def buildOptions(): Options = { val options = new Options options.addOption("date", true, "[must] date") options.addOption("business", true, "[must] business") options.addOption("coalesce", true, "[must] coalesce") options.addOption("output", true, "[must] output path") options } /** * 解析天处理结果数据 * * @param business * 分区 * @param date * 日期 * @return */ def processDailyData(business: String, date: String, spark: SparkSession): DataFrame }