package mobvista.dmp.datasource.tracking_3s import java.net.URI import mobvista.dmp.common.{CommonMapReduce, CommonSparkJob} import mobvista.prd.datasource.util.MRUtils import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import scala.collection.mutable.ArrayBuffer object TrackingInstallDaily { val idfaRegex = "^[0-9A-F]{8}(-[0-9A-F]{4}){3}-[0-9A-F]{12}$" def main(args: Array[String]): Unit = { val options = buildOptions() val parser = new BasicParser val commandLine = parser.parse(options, args) // if (!checkMustOption(commandLine)) { // printUsage(options) // return 1 // } else { // printOptions(commandLine) // } val campaign_input_path = commandLine.getOptionValue("campaign_input_path") val input_path_3s = commandLine.getOptionValue("input_path_3s") val output = commandLine.getOptionValue("output") val datetime = commandLine.getOptionValue("date") val coalesce = commandLine.getOptionValue("coalesce").toInt val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"TrackingInstallDaily") val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) try{ val campaign_input = sc.textFile(campaign_input_path) val input_3s = sc.textFile(input_path_3s) val campaign_input_filter: RDD[String] = campaign_input.filter(line => { if (line.contains("\t")) { val strings: Array[String] = MRUtils.SPLITTER.split(line, -1) strings.length == 3 && !StringUtils.isEmpty(strings(2)) }else{ false } }) val campaign_input_filterMap = campaign_input_filter.map(record => { val splits = StringUtils.splitPreserveAllTokens(record, "\t", -1) (splits(0), (splits(1),splits(2))) }).collectAsMap() val campaign_input_filterMapBC: Broadcast[collection.Map[String, (String,String)]] = sc.broadcast(campaign_input_filterMap) // val input_3s_faltmap: RDD[String] = input_3s.filter(!_.contains("\t")).flatMap(line=>{ parseflatMapData(line,campaign_input_filterMapBC,datetime) }) to be same as mobvista.dmp.datasource.tracking.mapreduce.TrackingInstallDailyMR val input_3s_faltmap: RDD[String] = input_3s.flatMap(line=>{ parseflatMapData(line,campaign_input_filterMapBC,datetime) }) input_3s_faltmap.map(line => {parseMapData(line,campaign_input_filterMapBC)}).coalesce(coalesce).saveAsTextFile(output,classOf[GzipCodec]) }finally{ if (sc != null) { sc.stop() } } } def parseflatMapData(line: String,bmap: Broadcast[collection.Map[String, (String,String)]],date:String): Array[String] = { val arrayBuffer = new ArrayBuffer[String]() val fields: Array[String] = line.split(",",-1) val matchPackage = bmap.value.get(fields(1)) println("00=="+fields(1)) if (matchPackage.isDefined) { if (fields != null && fields.length >= 29) { if (fields(10).matches(CommonMapReduce.didPtn) && !fields(10).equals(CommonMapReduce.allZero)) { arrayBuffer += MRUtils.JOINER.join(fields(1), fields(10), fields(3).toUpperCase, date) } else if (fields(28).matches(CommonMapReduce.didPtn) && !fields(28).equals(CommonMapReduce.allZero)) { arrayBuffer += MRUtils.JOINER.join(fields(1), fields(28), fields(3).toUpperCase, date) } if (!fields(10).isEmpty && fields(10).matches(CommonMapReduce.andriodIdPtn)) { arrayBuffer += MRUtils.JOINER.join(fields(1), fields(10), fields(3).toUpperCase, date) } if (!fields(11).isEmpty && fields(11).matches(CommonMapReduce.imeiPtn)) { arrayBuffer += MRUtils.JOINER.join(fields(1), "imei", fields(11), fields(3).toUpperCase, date) } } } arrayBuffer.toArray } def parseMapData(line: String,bmap: Broadcast[collection.Map[String, (String,String)]]):String ={ val fields: Array[String] = MRUtils.SPLITTER.split(line,-1) if(fields.length==5){ MRUtils.JOINER.join(fields(2), "imei", "android", bmap.value.get(fields(0)).get._2, fields(4), fields(3)); }else{ val deviceId = fields(1).replaceAll("\"", "").replaceAll("\\[", "") val platform = getPlatform(bmap.value.get(fields(0)).get._1, deviceId) var deviceType = getDeviceType(platform) var packageName=bmap.value.get(fields(0)).get._2 if ("ios" == platform) packageName = packageName.replace("id", "") if (deviceId.matches(CommonMapReduce.andriodIdPtn)) deviceType = "androidid" MRUtils.JOINER.join(deviceId, deviceType, platform, packageName, fields(3), fields(2)) } } private def getPlatform(platform: String, deviceId: String): String = { if (!"ios".equalsIgnoreCase(platform) && !"android".equalsIgnoreCase(platform)) { if (deviceId.matches(idfaRegex)) return "ios" else return "android" } platform } private def getDeviceType(platform: String) = platform match { case "ios" => "idfa" case "android" => "gaid" case "adr" => "gaid" case _ => "unknown" } def buildOptions(): Options = { val options = new Options options.addOption("campaign_input_path", true, "[must] campaign input path") options.addOption("input_path_3s", true, "[must] 3s input path") options.addOption("output", true, "[must] output path") options.addOption("date", true, "[must] date") options.addOption("coalesce", true, "[must] coalesce") options } }