package mobvista.dmp.datasource.clever import java.net.{URI, URLDecoder} import java.util import mobvista.dmp.common.{CommonSparkJob, UrlParser} import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang.StringUtils import org.apache.commons.net.util.Base64 import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer class ParseCleverDaily extends CommonSparkJob with Serializable { override val DATA_SPLIT = " " val FIELD_SPLIT = "\t" val urlParser = new UrlParser var word = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/==" var secret = "uVUoXc3pCnDFvb8lNJj9ZHEia7QYrfSmROkGKA0ehIdtzB64Mq2gP5syTL1wWx+/==" val pDict = new util.HashMap[Character, Character] for (i <- 0 until word.length) { pDict.put(secret.charAt(i), word.charAt(i)) } word = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/==" secret = "vSoajc7dRzpWifGyNxZnV5k+DHLYhJ46lt0U3QrgEuq8sw/XMeBAT2Fb9P1OIKmC==" val cleverDict = new util.HashMap[Character, Character] for (i <- 0 until word.length) { cleverDict.put(secret.charAt(i), word.charAt(i)) } def run(args: Array[String]): Int = { var sc: SparkContext = null try { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printOptions(commandLine) return 1 } else { printOptions(commandLine) } val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val parallelism = commandLine.getOptionValue("parallelism").toInt val coalesce = commandLine.getOptionValue("coalesce").toInt val spark = SparkSession.builder() .appName("ParseCleverDaily") .config("spark.rdd.compress", "true") .config("spark.default.parallelism", s"$parallelism") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() import spark.implicits._ sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) val data = sc.textFile(input) val resultRDD = data.filter(record => record.contains("/openapi/clever?p=")) .map(record => { // 获取参数P的值 val splits = StringUtils.splitPreserveAllTokens(record, DATA_SPLIT, -1) var url = splits(6) url = s"$HTTPPREFIX/$url" urlParser.evaluate(url, QUERY, "p") }) .filter(StringUtils.isNotEmpty(_)) .map(pValue => { // 对P参数值进行Base64解码,并拼接成url val pDecode = decodeParams(pValue, pDict) s"$HTTPPREFIX/test?${pDecode}" }) .filter(url => { // 1、过滤掉gaid或clever为null的数据 // 2、过滤到clever解密后不是jsonArray的数据 val gaid = urlParser.evaluate(url, QUERY, "gaid") val clever = urlParser.evaluate(url, QUERY, "clever") if (StringUtils.isNotEmpty(gaid) && StringUtils.isNotEmpty(clever)) { try { val cleverDecode = decodeParams(URLDecoder.decode(clever, ENCODING), cleverDict) if (cleverDecode.trim.startsWith("[") && cleverDecode.trim.endsWith("]")) { true } else { false } } catch { case e: Exception => { e.printStackTrace() false } } } else { false } }) .flatMap(parseClever(_)) .distinct() .coalesce(coalesce, true) .toDF() .write .format("ORC") .option("orc.compress", "zlib") .save(output) } finally { if (sc != null) { sc.stop() } } 0 } def parseClever(url: String): Array[CleverVO] = { val buffer = new ArrayBuffer[CleverVO]() val gaid = urlParser.evaluate(url, QUERY, "gaid") val model = urlParser.evaluate(url, QUERY, "model") val brand = urlParser.evaluate(url, QUERY, "brand") val clever = urlParser.evaluate(url, QUERY, "clever") val cleverDecode = decodeParams(URLDecoder.decode(clever, ENCODING), cleverDict) try { val jsonArray = GsonUtil.String2JsonArray(cleverDecode) jsonArray.foreach(element => { val jsonObject = element.getAsJsonObject val packageName = jsonObject.get("p").getAsString.replaceAll("[\\s\r\n]+", "") buffer += CleverVO(gaid, "gaid", "android", model, brand, packageName) }) } catch { case e: Exception => { println("sdfsdfsdfs= " + cleverDecode) println("url= " + url) e.printStackTrace() } } buffer.toArray } def decodeParams(str: String, dict: util.Map[Character, Character]): String = { val array = str.toCharArray for (i <- 0 until array.length()) { try { array(i) = dict.get(array(i)) } catch { case e: Exception => { println(str) println(i) e.printStackTrace() } } } val decode = String.valueOf(array) return new String(Base64.decodeBase64(decode)) } } case class CleverVO(device_id: String, device_type: String, platform: String, model: String, brand: String, package_name: String) { override def hashCode() = { this.device_id.hashCode + this.device_type.hashCode } override def equals(obj: scala.Any): Boolean = { if (obj.isInstanceOf[CleverVO]) { val tmp = obj.asInstanceOf[CleverVO] return tmp.device_id.equals(this.device_id) && tmp.device_type.equals(this.device_type) } return false } } object ParseCleverDaily { def main(args: Array[String]): Unit = { new ParseCleverDaily().run(args) } }