package mobvista.dmp.common import java.net.URI import java.util import com.alibaba.fastjson.JSONObject import com.google.gson.{JsonArray, JsonElement, JsonObject} import mobvista.dmp.datasource.mpsdk.InstallInfo import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SparkSession} import org.joda.time.format.DateTimeFormat import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer /** * 填数据合并到安装列表通用程序, * 记成该类后,需实现数据处理方法即可 * fengliang */ abstract class CommonInstallListV2 extends CommonSparkJob with Serializable { protected val FIELD_SPLIT = "\t" protected var DAILY_STORE_FORMAT = "ORC" def run(args: Array[String]): Int = { var sc: SparkContext = null try { options.addOption("date", true, "[must] today") options.addOption("oldInput", true, "[must] yestoday install data path") val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return 1 } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val oldInput = commandLine.getOptionValue("oldInput") val parallelism = commandLine.getOptionValue("parallelism").toInt val coalesce = commandLine.getOptionValue("coalesce").toInt val expireDate = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(date).minusMonths(12).toString("yyyy-MM-dd") val spark = SparkSession .builder() .appName("MPSDKTotal") .config("spark.rdd.compress", "true") .config("spark.default.parallelism", parallelism) .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() // 任务重试过程中,路径已经存在造成无法写入的bug FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) // FileSystem.get(spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) val dailyDF = spark.read.format(DAILY_STORE_FORMAT).load(input) // val dailyData = dailyDF.rdd.map(convertDayRow(_)) val dailyRDD = dailyDF.rdd.mapPartitions(processDailyData(_, date)).filter(tuple => { val array = splitFun(tuple._1) array(0).matches(didPtn) && !allZero.equals(array(0)) || array(0).matches(imeiPtn) }) .groupByKey() .map(tuple => { val set = new util.HashSet[InstallInfo]() tuple._2.foreach(info => { set.add(info) }) (tuple._1, set) }) sc = spark.sparkContext val installRDD = sc.textFile(oldInput) .map(splitFun(_)) .filter(array => (array(0).matches(didPtn) && !allZero.equals(array(0)) || array(0).matches(imeiPtn))) .map(splits => { (s"${splits(0)}$DATA_SPLIT${splits(1)}$DATA_SPLIT${splits(2)}", splits(3)) }) dailyRDD.cogroup(installRDD).map( tuple => { val key = tuple._1 val valTuple = tuple._2 val dailyIter = valTuple._1 val totalIter = valTuple._2 var tags = "" if (dailyIter.isEmpty && totalIter.nonEmpty) { // 需删除过期的安装信息 val installArray = new JsonArray totalIter.foreach(str => { GsonUtil.String2JsonArray(str).foreach(element => { val installDate = getJsonValue(element, "date") // 验证安装信息是否已超出期限 if (StringUtils.isNotBlank(installDate)) { if (installDate.compareTo(expireDate) >= 0) { installArray.add(element) } } else { installArray.add(element) } }) }) tags = installArray.toString } else if (dailyIter.nonEmpty && totalIter.isEmpty) { val jsonArray = new JsonArray dailyIter.foreach(installSet => { installSet.foreach(installInfo => { var installDate = installInfo.getDate() if (installDate.compareTo(date) > 0) { installDate = date } if (jsonArray.size() < 1000) { val json = new JsonObject json.addProperty("date", installDate) json.addProperty("package_name", installInfo.getPackage_name().replaceAll("[^0-9a-zA-Z\\.\\_]+", "")) jsonArray.add(json) } }) }) tags = jsonArray.toString } else if (dailyIter.nonEmpty && totalIter.nonEmpty) { val map = new JSONObject() totalIter.foreach(str => { GsonUtil.String2JsonArray(str).foreach(element => { val installDate = getJsonValue(element, "date") val installPackage = getJsonValue(element, "package_name") // 验证安装信息是否已超出期限 if (StringUtils.isNotBlank(installDate)) { if (installDate.compareTo(expireDate) >= 0) { map.put(installPackage, installDate) } } else { map.put(installPackage, installDate) } }) }) dailyIter.foreach(installSet => { installSet.foreach(installInfo => { var installDate = installInfo.getDate() if (installDate.compareTo(date) > 0) { installDate = date } val packageName = installInfo.getPackage_name().replaceAll("[^0-9a-zA-Z\\.\\_]+", "") val tmpDate = map.get(packageName).toString if (StringUtils.isNotBlank(tmpDate)) { if (tmpDate.compareTo(installDate) < 0) { map.put(packageName, installDate) } } else { map.put(packageName, installDate) } }) }) val jsonArray = new JsonArray map.entrySet().foreach(entry => { if (jsonArray.size() < 1000) { val json = new JsonObject if (StringUtils.isNotEmpty(entry.getValue.toString)) { json.addProperty("date", entry.getValue.toString) } json.addProperty("package_name", entry.getKey) jsonArray.add(json) } }) tags = jsonArray.toString } s"${key}${DATA_SPLIT}${tags}" }).repartition(coalesce).saveAsTextFile(output, classOf[GzipCodec]) } finally { if (sc != null) { sc.stop() } } 0 } def getJsonValue(element: JsonElement, key: String): String = { if (element != null && !element.isJsonNull) { element.getAsJsonObject.get(key).getAsString } else { "" } } /** * 将row转化为Array[String] * * @param row * @return */ def convertDayRow(row: Row): Array[String] = { val buffer = new ArrayBuffer[String]() for (i <- 0 until row.size) { buffer += row.getString(i) } buffer.toArray } /** * 解析天处理结果数据 * * @param array * @param date * @return */ def processDailyData(array: Iterator[Row], date: String): Iterator[Tuple2[String, InstallInfo]] } // case class InstallListVO(key: String, package_name: String, date: String)