package mobvista.dmp.test import java.net.URI import java.util import mobvista.dmp.common.CommonSparkJob import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SparkSession import scala.collection.JavaConversions._ class ParseReport extends CommonSparkJob with Serializable { val split = "," val iosPkgRegex = "^[0-9]+$" override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) } else { printOptions(commandLine) } val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val parallelism = commandLine.getOptionValue("parallelism").toInt val coalesce = commandLine.getOptionValue("coalesce").toInt val spark = SparkSession .builder() .appName("IdMappingJob") .config("spark.rdd.compress", "true") .config("spark.default.parallelism", s"${parallelism}") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output),true) try { sc.textFile(input) .map(splitFun(_, "\001")) .map(array => { val interest = array(4) if (interest.startsWith("[")) { val set = new util.HashSet[String]() GsonUtil.String2JsonArray(interest) .foreach(element => { element.getAsJsonObject .get("tag") .getAsJsonArray .foreach(tagElement => { val tagObj = tagElement.getAsJsonObject val firstTag = if (tagObj.get("1") != null) tagObj.get("1").getAsString else "" val secondTag = if (tagObj.get("2") != null) tagObj.get("2").getAsString else "" set.add(s"${firstTag}->${secondTag}") }) }) array(4) = set.mkString(split) } val deviceType = array(1) val installList = array(5) if (StringUtils.isNotEmpty(installList) && installList.startsWith("[")) { val set = new util.HashSet[String]() GsonUtil.String2JsonArray(installList) .foreach(element => { val packageName = element.getAsJsonObject.get("package_name").getAsString if (("idfa".equals(deviceType) && packageName.matches(iosPkgRegex)) || "gaid".equals(deviceType)) { set.add(packageName) } }) array(5) = set.mkString(split) } array.mkString(DATA_SPLIT) }) .saveAsTextFile(output) } finally { if (spark != null) { spark.stop() } } 0 } } object ParseReport { def main(args: Array[String]): Unit = { new ParseReport().run(args) } }