package mobvista.dmp.datasource.adn_request_other import mobvista.dmp.common.CommonSparkJob import org.apache.commons.lang.StringUtils import org.apache.spark.sql.SparkSession import scala.collection.mutable.ArrayBuffer /** * */ class EtlAdnRequestOtherDaily extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return 1 } else { printOptions(commandLine) } val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val spark = SparkSession .builder() .appName("EtlAdnRequestOtherDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.autoBroadcastJoinThreshold", "209715200") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() import spark.implicits._ val sc = spark.sparkContext try { // sc.newAPIHadoopFile[LongWritable, Text, CombineTextInputFormat](input) sc.textFile(input) .map(line => line) .map(splitFun) .filter(filterData) .map(array => { val gaid = array(15) val idfa = array(16) val imei = array(12) val platform = array(4) val android = array(14) val extPackageName = array(19) val buffer = new ArrayBuffer[(String, String, String, String)]() if ("android".equals(platform)) { if (StringUtils.isNotBlank(gaid)) { buffer += ((gaid, "gaid", platform, extPackageName)) } if (StringUtils.isNotBlank(imei)) { buffer += ((imei, "imei", platform, extPackageName)) } if (StringUtils.isNotBlank(android)) { buffer += ((android, "androidid", "android", extPackageName)) } } else { buffer += ((idfa, "idfa", platform, extPackageName)) } buffer.toArray }) .flatMap(arr => arr.map(tuple => tuple)) .toDF("device_id", "device_type", "platform", "package_name") .createOrReplaceTempView("t_daily") val sql = """ |select t.device_id, t.device_type, t.platform, t.package_name |from t_daily t |where length(t.package_name) >= 4 |group by t.device_id, t.device_type, t.platform, t.package_name """.stripMargin spark.sql(sql) .write .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } /** * * @param array * @return */ def filterData(array: Array[String]): Boolean = { if (array.length <= 20) { return false } val gaid = array(15) val idfa = array(16) val imei = array(12) val platform = array(4) val extPackageName = array(19) // 过滤掉gaid和idfa全为空的情况 if (StringUtils.isBlank(gaid) && StringUtils.isBlank(idfa) && StringUtils.isBlank(imei)) { return false } // 过滤掉gaid、idfa和platform格式错误 if ("android".equals(platform)) { if (!gaid.matches(didPtn) || gaid.equals(allZero) || !imei.matches(imeiPtn)) { return false } } else if ("ios".equals(platform)) { if (!idfa.matches(didPtn) || idfa.equals(allZero)) { return false } } else { return false } // 过滤掉extPackageName为空 if (StringUtils.isBlank(extPackageName)) { return false } true } } object EtlAdnRequestOtherDaily { def main(args: Array[String]): Unit = { new EtlAdnRequestOtherDaily().run(args) } }