package mobvista.dmp.datasource.adn import java.net.URI import java.util import com.google.common.base.Strings import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.MRUtils import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import scala.collection.mutable.ArrayBuffer /** * @package: mobvista.dmp.datasource.adn * @author: wangjf * @date: 2019-10-29 * @time: 11:05 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class AdnRequestSdkHour extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("input", true, "input") options.addOption("input_dict1", true, "input_dict1") options.addOption("input_dict2", true, "input_dict2") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options.addOption("date", true, "date") options } override protected def run(args: Array[String]): Int = { val imeiPtn = "^([0-9]{15,17})$" val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val coalesce = commandLine.getOptionValue("coalesce") val input = commandLine.getOptionValue("input") val input_dict1 = commandLine.getOptionValue("input_dict1") val input_dict2 = commandLine.getOptionValue("input_dict2") val output = commandLine.getOptionValue("output") val spark = SparkSession .builder() .config("spark.rdd.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) val bMap = sc.broadcast(sc.textFile(input_dict1).union(sc.textFile(input_dict2)).map(r => { val rs = r.split(",") (rs(0), rs(1)) }).collectAsMap()) // val bMap = sc.broadcast() val rdd = sc.textFile(input) val kv_rdd = rdd.map(l => { var res = new ArrayBuffer[String]() val array = MRUtils.SPLITTER.split(l) if (array.length >= 54) { val gaid = array(42) val idfa = array(43) val imei = array(35) val model = array(16) val osVersion = array(14) val androidId = array(37) val platform = array(13) val appId = array(4) var extSysid = "" if (array.length >= 118) extSysid = array(117) // split bug,末尾填充特殊字符 val value = MRUtils.JOINER.join(appId, model, osVersion, "A") var key = "" import mobvista.dmp.datasource.adn.mapreduce.GetDevIdUtil //新增自有Id val sysIdType = GetDevIdUtil.getExtSysId(extSysid) if (StringUtils.isNotBlank(sysIdType)) { key = MRUtils.JOINER.join(sysIdType, platform.toLowerCase) res += key + ";" + value } val devIdType = GetDevIdUtil.getIdByIdAndPlatform(gaid, idfa, extSysid, platform) if (StringUtils.isNotBlank(devIdType)) { key = MRUtils.JOINER.join(devIdType, platform.toLowerCase) res += key + ";" + value } if (!imei.isEmpty && imei.matches(imeiPtn) && "android" == platform) { key = MRUtils.JOINER.join(imei, "imei", platform.toLowerCase) res += key + ";" + value } } res.iterator }).coalesce(coalesce.toInt) /* map(r => { val cur = r.split(";") (cur(0), cur(1)) }) */ import spark.implicits._ val df = kv_rdd .flatMap(l => l) .mapPartitions(mapFun) .combineByKey( (v: String) => Iterable(v), (c: Iterable[String], v: String) => c ++ Seq(v), (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2 ).filter(r => { val s = MRUtils.SPLITTER.split(r._1) s.length == 3 && StringUtils.isNotBlank(s(0)) && StringUtils.isNotBlank(s(1)) && StringUtils.isNotBlank(s(2)) }).map(r => { val res = new ArrayBuffer[AdnRequestSdkClass]() val key = r._1 val keys = MRUtils.SPLITTER.split(key) val device_id = keys(0) val device_type = keys(1) val platform = keys(2) var model = "" var osVersion = "" var pkgName = "" val values = r._2.iterator while (values.hasNext) { val vs = MRUtils.SPLITTER.split(values.next()) if (StringUtils.isBlank(model) && vs.length >= 3) { model = vs(1) } if (StringUtils.isBlank(osVersion) && vs.length >= 3) { osVersion = vs(2) } pkgName = Strings.nullToEmpty(bMap.value.getOrElse(vs(0), "")) res += AdnRequestSdkClass(device_id, device_type, platform, vs(0), pkgName, model, osVersion) } res }).flatMap(r => r) .toDF df.dropDuplicates .coalesce(coalesce.toInt / 10) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } def mapFun(iter: Iterator[String]): Iterator[(String, String)] = { import scala.collection.JavaConverters._ val res = new util.ArrayList[(String, String)]() while (iter.hasNext) { val cur = iter.next().split(";") res.add((cur(0), cur(1))) } res.asScala.iterator } } object AdnRequestSdkHour { def main(args: Array[String]): Unit = { new AdnRequestSdkHour().run(args) } }