package mobvista.dmp.datasource.mpsdk import java.net.{URI, URLDecoder} import java.util import mobvista.dmp.common.{CommonSparkJob, UrlParser} import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.Options import org.apache.commons.lang.StringUtils import org.apache.commons.net.util.Base64 import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import scala.collection.mutable.ArrayBuffer /** * 解析MPSDK数据 * 1、MPSDK数据均为安卓平台数据 * 2、只要带lfq参数的数据 * 3、adid参数值为gaid,mc参数值为AndroidId(base64转码),当adid为空时使用mc与现有对应关系匹配或得gaid, * 如果匹配到gaid,则该数据被抛弃 */ class ParseMPSDKDaily extends CommonSparkJob with Serializable { val dataSplit = " " val encode = "UTF-8" val httpPrefix = "http://test.com" def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return 1 } else { printOptions(commandLine) } val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val mappingPath = commandLine.getOptionValue("mappingPath") val parallelism = commandLine.getOptionValue("parallelism").toInt val coalesce = commandLine.getOptionValue("coalesce").toInt // 构建hbase解码字典 val dict = buildDict() val spark = SparkSession.builder() .appName("ParseMPSDK") .config("spark.rdd.compress", "true") .config("spark.speculation", "true") .config("spark.speculation.quantile", "0.9") .config("spark.speculation.multiplier", "1") .config("spark.io.compression.codec", "snappy") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .getOrCreate() import spark.implicits._ val sc = spark.sparkContext try { val dailyRDD = sc.textFile(input) .filter(record => record.contains("lfq")) .map(splitFun(_, dataSplit)(6)) .map(parseData(_, dict)) .map(array => MpSDKVO(array(0), array(1), array(2), array(3), array(4), array(5), array(6))) .toDF() .createOrReplaceTempView("tmp_sdk_daily") // 过滤id_mapping日志,获得(adrid, gaid)形式 sc.textFile(mappingPath) .map(record => { val json = GsonUtil.String2JsonObject(record) val gaid = json.get("gaid").getAsString val adrId = json.get("android_id").getAsString val dmpTime = json.get("dmp_time").getAsString (gaid, adrId, dmpTime) }) .toDF("gaid", "android_id", "dmp_time") .createOrReplaceTempView("tmp_mapping") var sql = """ |select t.gaid, t.android_id |from ( | select t.*, row_number() over(partition by t.android_id order by t.dmp_time desc) as rk | from tmp_mapping t | where t.gaid <> '0' and t.android_id <> '0' |) t |where t.rk = 1 """.stripMargin spark.sql(sql) .createOrReplaceTempView("tmp_gaid_aid_mapping") sql = """ |select * |from ( | select a.gaid as device_id, b.android_id, b.appid, b.brand, b.model, b.package_list, b.app_pname | from tmp_gaid_aid_mapping a | join ( | select t.device_id, t.android_id, t.appid, t.brand, t.model, t.package_list, t.app_pname | from tmp_sdk_daily t | where t.device_id not rlike '^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$' | and t.android_id is not null and t.android_id <> '' | and t.package_list like '[%' | ) b on a.android_id=b.android_id | | union all | | select t.device_id, t.android_id, t.appid, t.brand, t.model, t.package_list, t.app_pname | from tmp_sdk_daily t | where t.device_id rlike '^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$' | and t.package_list like '[%' |) t |where t.device_id rlike '^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$' """.stripMargin FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) spark.sql(sql) .repartition(coalesce) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } def buildDict(): util.HashMap[Character, Character] = { // 初始化base64字典 val word = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=" val secret = "vSoajc7dRzpWifGyNxZnV5k+DHLYhJ46lt0U3QrgEuq8sw/XMeBAT2Fb9P1OIKmC=" val dict = new util.HashMap[Character, Character] for (i <- 0 until word.length) { dict.put(secret.charAt(i), word.charAt(i)) } dict } /** * 按照字典解密mc * * @param mc * @param dict * @return */ def decodeMC(mc: String, dict: util.Map[Character, Character]): String = { val array = mc.toCharArray try { for (i <- 0 until array.length()) { array(i) = dict.get(array(i)) } } catch { case e: NullPointerException => { throw new RuntimeException("dict = null" + (dict == null) + "\n mc = " + mc) } } val decode = String.valueOf(array) return new String(Base64.decodeBase64(decode)) } /** * 过滤gaid和adrid都合法的数据 * * @param record * @return */ def filterMappingData(record: String): Boolean = { val json = GsonUtil.String2JsonObject(record) val jsonElement = json.get("gaid") val adrJsonElement = json.get("android_id") if (jsonElement != null && adrJsonElement != null) { val gaid = jsonElement.getAsString val adrId = adrJsonElement.getAsString if (StringUtils.isNotEmpty(gaid) && !gaid.matches("^0+$") && StringUtils.isNotEmpty(adrId) && !adrId.matches("^0+$")) { return true } } false } /** * 解析URL参数对应的值 * * @param record * @return */ def parseData(record: String, dict: util.Map[Character, Character]): Array[String] = { val urlParser = new UrlParser val url = s"$httpPrefix/$record" val buffer = new ArrayBuffer[String] val params = Array("adid", "mc", "appid", "brand", "model", "lfq", "app_pname") params.foreach(param => { var paramValue = urlParser.evaluate(url, QUERY, param) if (StringUtils.isNotEmpty(paramValue)) { try { if (paramValue.contains("%")) { paramValue = URLDecoder.decode(paramValue, encode) } if ("mc".equals(param) || "lfq".equals(param)) { buffer += decodeMC(paramValue, dict) } else { buffer += paramValue } } catch { case e: Exception => { buffer += "" } } } else { buffer += "" } }) buffer.toArray } override protected def buildOptions(): Options = { val options = new Options options.addOption("input", true, "[must] input path") options.addOption("mappingPath", true, "[must] gaid and androidId mapping data path") options.addOption("output", true, "[must] output path") options.addOption("parallelism", true, "parallelism of shuffle operation") options.addOption("coalesce", true, "number of output files") options } } case class MpSDKVO(device_id: String, android_id: String, appid: String, brand: String, model: String, package_list: String, app_pname: String) object ParseMPSDKDaily { def main(args: Array[String]): Unit = { new ParseMPSDKDaily().run(args) } }