package mobvista.dmp.datasource.packagelist import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.util.MRUtils import org.apache.commons.cli.Options import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.functions.{collect_set, udf} import org.apache.spark.sql.{DataFrame, SparkSession} import java.net.URI import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.Breaks._ class MergeCampaignList extends CommonSparkJob with Serializable { val dataSplit = "\t" private val ios = "ios" private val android = "adr" override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("today", true, "[must] today") options.addOption("input_dmp_data_adn", true, "[must] input_dmp_data_adn") options.addOption("input_campaign_adn", true, "[must] input_campaign_adn") options.addOption("output", true, "[must] output") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val today = commandLine.getOptionValue("today") val input_dmp_data_adn = commandLine.getOptionValue("input_dmp_data_adn") val input_campaign_adn = commandLine.getOptionValue("input_campaign_adn") val output = commandLine.getOptionValue("output") val spark = SparkSession.builder() .appName("MergeCampaignList") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) import spark.implicits._ try { val mergeValueUdf = udf((regsArrays: mutable.WrappedArray[String]) => { var res = "" if (regsArrays != null && regsArrays.size != 0) { breakable { for (regs <- regsArrays) { if (StringUtils.isNotBlank(regs)) { val splits = regs.split(dataSplit, -1) res = regs if (splits(3) == "new") { res = MRUtils.JOINER.join(splits(0), splits(1), splits(2), today) break } } } } } res }) val DmpDf: DataFrame = spark.sparkContext.textFile(input_dmp_data_adn).flatMap(parseDMPData).toDF("campaignId", "value") val CampaignDf: DataFrame = spark.sparkContext.textFile(input_campaign_adn).flatMap(parseCAMPAIGNData).toDF("campaignId", "value") DmpDf.union(CampaignDf).orderBy("campaignId").groupBy("campaignId") .agg(mergeValueUdf(collect_set("value"))) .coalesce(coalesce.toInt) .rdd.map(_.mkString("\t")).saveAsTextFile(output, classOf[GzipCodec]) } finally { spark.stop() } 0 } def parseDMPData(record: String): Array[MergeCampaignId] = { val arrayBuffer = new ArrayBuffer[MergeCampaignId]() val splits: Array[String] = StringUtils.splitPreserveAllTokens(record, dataSplit, -1) if (splits.length >= 4) { val campaignId: String = splits(0) var packageName: String = splits(1) var platform: String = splits(2) val appName: String = splits(3) if (platform.equals("1")) { platform = android } else { platform = ios } packageName = getPackageName(packageName, platform) if (StringUtils.isNotEmpty(packageName)) { arrayBuffer += MergeCampaignId(campaignId, MRUtils.JOINER.join(platform, packageName, appName, "new")) } } arrayBuffer.toArray } def parseCAMPAIGNData(record: String): Array[MergeCampaignId] = { val arrayBuffer = new ArrayBuffer[MergeCampaignId]() val splits: Array[String] = StringUtils.splitPreserveAllTokens(record, dataSplit, -1) if (splits.length >= 4) { val campaignId: String = splits(0) val platform: String = splits(1) var packageName: String = splits(2) val appName: String = splits(3) val updateDate: String = splits(4) packageName = getPackageName(packageName, platform) if (StringUtils.isNotEmpty(packageName)) { if (ios == platform) { arrayBuffer += MergeCampaignId(campaignId, MRUtils.JOINER.join(ios, packageName, appName, updateDate)) } else if (android == platform) { arrayBuffer += MergeCampaignId(campaignId, MRUtils.JOINER.join(android, packageName, appName, updateDate)) } } } arrayBuffer.toArray } def getPackageName(packageName: String, platform: String): String = { val pkgName = if (platform.equals(android) && MobvistaConstant.adrPkgPtn.matcher(packageName).matches) { packageName.split("&", -1)(0) } else if (MobvistaConstant.iosPkgPtn.matcher(packageName).matches || MobvistaConstant.idIosPkgPtn.matcher(packageName).matches) { packageName } else { "" } pkgName } } object MergeCampaignList { def main(args: Array[String]): Unit = { new MergeCampaignList().run(args) } } case class MergeCampaignId(campaignId: String, value: String)