package mobvista.dmp.datasource.rtdmp import com.alibaba.fastjson.{JSONArray, JSONObject} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.datasource.rtdmp.Constant.AudienceInfoPre import mobvista.dmp.util.{DateUtil, MD5Util} import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import org.slf4j.LoggerFactory import java.net.URI import java.util import scala.collection.JavaConversions._ import scala.collection.mutable /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/7/13 * @time: 11:25 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class RTDmpMainPre extends CommonSparkJob with Serializable { private val logger = LoggerFactory.getLogger(classOf[RTDmpMainPre]) def commandOptions(): Options = { val options = new Options() options.addOption("time", true, "time") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options.addOption("data_utime", true, "data_utime") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val time = commandLine.getOptionValue("time").replace(".", " ") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val data_utime = commandLine.getOptionValue("data_utime").replace(".", " ") val spark: SparkSession = SparkSession.builder() .appName(s"RTDmpMainPre.$time") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() val sc = spark.sparkContext try { var mergeRDD = sc.emptyRDD[(String, Int)] // 默认计算上个小时的数据 val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss") val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss") val audience_date_utime_start = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 28800 val audience_date_utime_end = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 25200 val map: util.Map[Integer, (JSONArray, Integer, Integer, JSONObject)] = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 1, 2) map.foreach(t => { val audienceId = Integer2int(t._1) val audienceOp = t._2._2 val dmap = new mutable.HashMap[String, String]() t._2._1.foreach(json => { val jsonObject = json.asInstanceOf[JSONObject] if (jsonObject.containsKey("s3_path") && StringUtils.isNotBlank(jsonObject.getString("s3_path"))) { // (s3_path, update_date) dmap.put(jsonObject.getString("s3_path"), jsonObject.getString("update_time")) } }) // 判断所有 s3_path 对应的 update_date 中是否有 当前 update_date,进行有效过滤 // if (dmap.values.contains(time)) { /** * audienceOp == 0 and dmap.size >= 2 即做差集计算,表示在上个分区出现,这个分区不出现的设备对应的安装包置为 -1 * audienceId, * 用于下游删除;否则与前一个人群包进行合并操作 */ val updateRDD = if (audienceOp == 1 && dmap.size >= 2) { val list = dmap.toList.sortWith(_._2 > _._2).take(1) // 按 update_date 进行降序排序,提取最前面的两个人群包 val pathUri = new URI(list.get(0)._1) val newAudience = if (FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration) .exists(new Path(pathUri.toString.replace("*", "")))) { sc.textFile(list.get(0)._1).repartition(100) .filter(r => { r.length <= 64 }) .map(r => { // First 为最新的人群包 val device_id = if (r.matches(MobvistaConstant.md5Ptn)) { r } else { MD5Util.getMD5Str(r) } (device_id, audienceId) }) } else { sc.emptyRDD[(String, Int)] } /* val oldAudience = sc.textFile(list.get(1)._1).repartition(100).map(r => { // Second 为旧的人群包,表示上一版本人群包 val device_id = if (r.matches(MobvistaConstant.md5Ptn)) { r } else { MD5Util.getMD5Str(r) } (device_id, (audienceId, list.get(1)._2)) }) oldAudience.subtractByKey(newAudience).map(t => { // 对差集 audienceId * -1,用于下游计算删除 audienceId val device_id = if (t._1.matches(MobvistaConstant.md5Ptn)) { t._1 } else { MD5Util.getMD5Str(t._1) } (device_id, ((-1) * audienceId, t._2._2)) // (devId, ((-1) * audienceId, update_date)) }).union(newAudience) // 与最新的人群包进行合并操作 */ newAudience } else if (dmap.size == 1) { val audData = dmap.toList.sortWith(_._2 > _._2).take(1) val pathUri = new URI(audData.get(0)._1) if (audData.nonEmpty && FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration) .exists(new Path(audData.get(0)._1.replace("*", "")))) { sc.textFile(audData.get(0)._1) .filter(r => { r.length <= 64 }) .map(r => { // 取出最新的人群包 val device_id = if (r.matches(MobvistaConstant.md5Ptn)) { r } else { MD5Util.getMD5Str(r) } (device_id, audienceId) }) } else { // 如果没有,则创建 空RDD sc.emptyRDD[(String, Int)] } } else { sc.emptyRDD[(String, Int)] } // 所有人群包进行合并操作 mergeRDD = mergeRDD.union(updateRDD) }) val df = mergeRDD.repartition(1000).groupByKey().map(r => { val devId = r._1 val set = new mutable.HashSet[Int]() // 生成 audienceId -> update_date JSONObject r._2.foreach(t => { set.add(t) }) (devId, set) }) import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) df.map(r => { AudienceInfoPre(r._1, r._2.mkString(",")) }).repartition(coalesce.toInt) .toDF .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object RTDmpMainPre { def main(args: Array[String]): Unit = { new RTDmpMainPre().run(args) } }