package mobvista.dmp.datasource.rtdmp import com.alibaba.fastjson.{JSONArray, JSONObject} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.format.RDDMultipleOutputFormat import mobvista.dmp.util.{DateUtil, PropertyUtil} import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} import org.apache.spark.storage.StorageLevel import java.net.URI import scala.collection.mutable /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/7/13 * @time: 11:25 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class RTDmpRequest extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options.addOption("table", true, "table") options.addOption("business", true, "business") options.addOption("hh", true, "hh") options } val package_name_set = new mutable.HashSet[String]() def check_package(package_name: String): Boolean = { package_name_set.contains(package_name) } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val business = commandLine.getOptionValue("business") val table = commandLine.getOptionValue("table") val hh = commandLine.getOptionValue("hh") val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val spark = MobvistaConstant.createSparkSession(s"RTDmpRequest.$date.$business") val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") val sc = spark.sparkContext try { val conf = spark.sparkContext.hadoopConfiguration conf.set("mapreduce.output.compress", "true") conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.setBoolean("mapreduce.output.fileoutputformat.compress", true) conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec]) spark.udf.register("check_package", check_package _) val auto_business = PropertyUtil.getProperty("config.properties", s"rtdmp.auto.business").split(",", -1).toSet[String] // val noauto_business = PropertyUtil.getProperty("config.properties", s"rtdmp.noauto.business").split(",", -1).toSet[String] var device_sql = if (auto_business.contains(business)) { Constant.device_sql .replace("@dt", date) .replace("@business", business) .replace("@check_package", "") .replace("@table", table) } else if (business.equals("tencent")) { PropertyUtil.getProperty("config.properties", s"$business.package_name").split(",", -1).foreach(p => { package_name_set.add(p) }) Constant.tencent_device_sql .replace("@dt", date) .replace("@check_package", "AND check_package(package_name)") .replace("@table", table) } else { PropertyUtil.getProperty("config.properties", s"$business.package_name").split(",", -1).foreach(p => { package_name_set.add(p) }) Constant.device_sql .replace("@dt", date) .replace("@business", business) .replace("@check_package", "AND check_package(package_name)") .replace("@table", table) } if (!hh.equals("00")) { device_sql = device_sql.replace("@check_hr", s"AND hh = '$hh'") .replace("@check_update_date", "") } else { device_sql = device_sql.replace("@check_hr", "") .replace("@check_update_date", s"AND update_date = '$update_date'") } println(s"package_name_set.size -->> ${package_name_set.size}") FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) // 停掉 audience val stopAudience = PropertyUtil.getProperty("config.properties", s"rtdmp.stop.audience").split(",", -1).toSet[String] .map(r => { Integer.valueOf(r) }) val rdd = spark.sql(device_sql).rdd.map(r => { val device_id = r.getAs[String]("device_id") var device_type = r.getAs[String]("device_type") val platform = r.getAs[String]("platform") val package_name = r.getAs[String]("package_name") device_type = { device_type match { case "imeimd5" => "imei_md5" case "gaidmd5" => "gaid_md5" case "oaidmd5" => "oaid_md5" case "idfamd5" => "idfa_md5" case _ => device_type } } (device_id, device_type, platform, package_name) }).repartition(coalesce) rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) rdd.map(t => { (new Text(s"$output/${t._4}/${t._2}"), new Text(t._1)) }).saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf) val mapRdd = rdd.map(t => { (t._4, (t._2, t._3)) }).distinct(10) .cache() val pkgTypeCount = mapRdd.map(r => { val deviceType = r._2._1 val platform = r._2._2 val packageName = { if (business.equals("btop") && !r._1.contains("_btop")) { r._1 + "_btop" } else { r._1 } } (packageName, (deviceType, platform)) }).countByKey() val portalMap = mapRdd .collectAsMap() val uploadPkgSet = new mutable.HashSet[String]() val uploadJsonArray = new JSONArray() val updateJsonArray = new JSONArray() portalMap.foreach(m => { val jsonObject = new JSONObject() val package_name = { if (business.equals("btop") && !m._1.contains("_btop")) { m._1 + "_btop" } else { m._1 } } var platform = if (StringUtils.isBlank(m._2._2)) { 0 } else if (m._2._2.equals("ios")) { 2 } else { 1 } if (platform == 0) { platform = if (MobvistaConstant.iosPkgPtn.matcher(m._1).matches()) { 2 } else { 1 } } val match_device_type = Logic.match_device_type(m._2._1) val map = Logic.getAudienceInfo(business) if (map.contains(package_name)) { if (!stopAudience.contains(map(package_name))) { jsonObject.put("id", map(package_name)) // jsonObject.put("audience_name", package_name) if (pkgTypeCount(package_name) == 1) { jsonObject.put("s3_path", s"$output/${m._1}/${m._2._1}/") } else { jsonObject.put("s3_path", s"$output/${m._1}/*/") } jsonObject.put("status", 1) jsonObject.put("audience_data_status", 1) updateJsonArray.add(jsonObject) } } else { if (pkgTypeCount(package_name) == 1) { jsonObject.put("s3_path", s"$output/${m._1}/${m._2._1}/") } else { jsonObject.put("s3_path", s"$output/${m._1}/*/") } jsonObject.put("platform", platform) jsonObject.put("match_device_type", match_device_type) jsonObject.put("audience_type", 2) jsonObject.put("data_update_method", 1) jsonObject.put("audience_name", package_name) jsonObject.put("status", 1) jsonObject.put("audience_gender", 3) jsonObject.put("audience_count", 1) jsonObject.put("is_sync_dmpserver", 1) jsonObject.put("audience_data_status", 1) uploadPkgSet.add(package_name) uploadJsonArray.add(jsonObject) } }) val uploadJsonObject = ServerUtil.upload(uploadJsonArray) if (uploadJsonObject.getInteger("code") == 200) { println("RTDmp Upload OK,AudienceId -->> " + uploadJsonObject.getJSONArray("data")) } Logic.writeAudienceInfo(business, Logic.getAudienceMap(uploadPkgSet)) // 每次更新一个 for (i <- 0 until updateJsonArray.size()) { val updateObject = new JSONArray() updateObject.add(updateJsonArray.getJSONObject(i)) var flag = true while (flag) { val updateJsonObject = ServerUtil.update(updateObject) if (updateJsonObject.getInteger("code") == 200) { println("RTDmp Update OK,updateJson -->> " + updateObject) flag = false } } } } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object RTDmpRequest { def main(args: Array[String]): Unit = { new RTDmpRequest().run(args) } }