package mobvista.dmp.datasource.adn_sdk import java.net.URI import java.text.SimpleDateFormat import java.util.{Date, Properties} import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import mobvista.dmp.common.MobvistaConstant import mobvista.dmp.datasource.apptag.Constant import mobvista.dmp.datasource.datatory.ConstantV2 import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{SparkSession, _} import org.apache.spark.storage.StorageLevel import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.Breaks._ /** * 刘凯 2018-01-18 15:20 * adn_sdk 原始日志接入至etl_adn_sdk_daily表 */ object AdnSdkDaily extends Serializable { val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$" val allZero = "00000000-0000-0000-0000-000000000000" val imeiPtn = "^([0-9]{15})$" val andriodIdPtn = "^[a-zA-Z0-9]{16}$" def main(args: Array[String]) { val spark = SparkSession.builder() .appName("AdnSdkDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val loadTime = spark.conf.get("spark.app.loadTime") val input_path = spark.conf.get("spark.app.input_path") val output_path = spark.conf.get("spark.app.output_path") val coalesce = Integer.parseInt(spark.conf.get("spark.app.coalesce")) val sc = spark.sparkContext val url = "http://setting.rayjump.com/pkg?app_id=24282&sign=489942276c3cf759447d81719c104b95" val stringToString: java.util.Map[String, String] = AdnSdkTools.getCampaignidPackagenameMap(url) import scala.collection.JavaConverters._ val campaignIdPackageNameMap: collection.Map[String, String] = mapAsScalaMapConverter(stringToString).asScala val campaignIdPackageNameMapBC: Broadcast[collection.Map[String, String]] = sc.broadcast(campaignIdPackageNameMap) try { val log_rdd = spark.sparkContext .textFile(input_path).coalesce(coalesce) val filter_rdd = log_rdd .filter(l => { !MobvistaConstant.String2JSONObject(l).isEmpty }).mapPartitions( elements => { val linesArr = new ArrayBuffer[Row]() if (elements.nonEmpty) { elements.foreach(p => { val etl_json = AdnSdkTools.getEtlJsonV2(p) // device_id,device_type,platform,data,model,country,sdk_version,brand val device_id = etl_json.getString("device_id") val device_type = etl_json.get("device_type") val platform = etl_json.get("platform") val data = etl_json.get("data") val model = etl_json.get("model") val brand = etl_json.get("brand") val country = etl_json.get("country") val sdk_version = etl_json.get("sdk_version") val imei = etl_json.getString("imei") val android_id = etl_json.getString("android_id") val rid_n = etl_json.getString("rid_n") val hb = etl_json.getString("hb") if (StringUtils.isNotBlank(imei) && imei.matches(imeiPtn)) { linesArr += Row(imei, "imei", "android", data, model, country, sdk_version, brand,rid_n,hb) } if (StringUtils.isNotBlank(android_id) && android_id.matches(andriodIdPtn)) { linesArr += Row(android_id, "androidid", "android", data, model, country, sdk_version, brand,rid_n,hb) } if (StringUtils.isNotBlank(device_id) && device_id.matches(didPtn) && !allZero.equals(device_id)) { linesArr += Row(device_id, device_type, platform, data, model, country, sdk_version, brand, rid_n,hb) } }) } linesArr.toIterator } ) filter_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) /*{ //device_id,device_type,platform,data,model,brand val device_id = etl_json.getString("device_id") val device_type = etl_json.get("device_type") val platform = etl_json.get("platform") val data = etl_json.get("data") val model = etl_json.get("model") val brand = etl_json.get("brand") val imei = etl_json.getString("imei") val android_id = etl_json.getString("android_id") if(StringUtils.isNotBlank(imei) && imei.matches(imeiPtn)){ linesArr += Row(imei,"imei","android",data,model, brand) } if(StringUtils.isNotBlank(android_id) && android_id.matches(andriodIdPtn)){ linesArr += Row(android_id,"android_id","android", data,model,brand) } if(device_id.matches(didPtn) && !allZero.equals(device_id)){ linesArr += Row(device_id,device_type,platform,data,model,brand) } } )*/ /* .filter { x => { val device_id = x.getString(0) val data = x.getString(3) data.contains("key=2000041") && data.contains("iex=1") && ((device_id.matches(didPtn) && !allZero.equals(device_id)) || || device_id.matches(andriodIdPtn)) } }*/ val input_schema = StructType(Array( StructField("device_id", StringType), StructField("device_type", StringType), StructField("platform", StringType), StructField("data", StringType), StructField("model", StringType), StructField("country", StringType), StructField("sdk_version", StringType), StructField("brand", StringType), StructField("rid_n", StringType), StructField("hb", StringType) )) val rdd_2000047 = filter_rdd.filter(row => { row.getString(3).contains("key=2000047") }) spark.createDataFrame(rdd_2000047, input_schema).filter("device_id!=''").createOrReplaceTempView("adn_sdk_input_2000047") import spark.implicits._ val sql01 = s"""select requestid,campaignid from dwh.ods_adn_trackingnew_request | LATERAL VIEW explode(split(extra2,',')) test_tmp_table AS campaignid | where concat(yyyy,mm,dd)='${loadTime}' and requestid!='' and campaignid!='' | """.stripMargin val Requestid_Campaignid_df = spark.sql(sql01).select($"requestid".cast("string"), $"campaignid".cast("string")) Requestid_Campaignid_df.createOrReplaceTempView("Requestid_Campaignid") // mysql -h adn-data-foronlinetest.c5yzcdreb1xr.us-east-1.rds.amazonaws.com -P 3306 -u adnro -pYcM123glh // 本来是spark访问mysql表campaign_list,但是读取数据时一直出现最后一个task持续运行不结束,可能时出现了数据倾斜。发现hive表dwh.ods_adn_campaign_list与mysql表数据相关联,最终采用hive表 // val properties = new Properties() // properties.put("user", "adnro") // properties.put("password", "YcM123glh") // val url = "jdbc:mysql://adn-data-foronlinetest.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/mob_adn" // val campaign_list_df = spark.read.jdbc(url, "campaign_list", "id", 1, 200000000, 500, properties) // .select("id", "trace_app_id") // .toDF("campaignid", "packagename") // .select($"campaignid".cast("string"), $"packagename".cast("string")) // .filter( !$"packagename".equalTo("") ) val sql02 = s""" |select id,package_name from dwh.ods_adn_campaign_list """.stripMargin val campaign_list_df = spark.sql(sql02).select($"id".cast("string"), $"package_name".cast("string")) campaign_list_df.createOrReplaceTempView("Campaignid_Packagename") val Key2000047_rdd: RDD[Row] = spark.sql( s""" |select t1.device_id,t1.device_type,t1.platform,t1.data,t1.model,t1.country,t1.sdk_version,t1.brand,t3.id,t3.package_name from (select * from adn_sdk_input_2000047 where hb='0') t1 |join Requestid_Campaignid t2 on t1.rid_n=t2.requestid |join Campaignid_Packagename t3 on t2.campaignid=t3.id |union |select t1.device_id,t1.device_type,t1.platform,t1.data,t1.model,t1.country,t1.sdk_version,t1.brand,t2.campaign_id,t2.package_name from (select * from adn_sdk_input_2000047 where hb='1') t1 |join (select get_json_object(ext_dsp, '$$.bd') package_name,bidid rid_n,get_json_object(ext_dsp, '$$.cid') campaign_id from dwh.ods_adn_hb_v1_bid where concat(yyyy,mm,dd)='${loadTime}') t2 on t1.rid_n=t2.rid_n |""".stripMargin ).rdd.map(contents => { parseKey2000047Data(contents, loadTime) }) val cal_rdd = filter_rdd.filter(row => {row.getString(3).contains("key=2000079") || (row.getString(3).contains("key=2000041") && row.getString(3).contains("iex=1") ) }).map { /*p => parseCalData(p, loadTime)*/ contents => { parseCalData(contents, loadTime, campaignIdPackageNameMapBC) } } val install_schema = StructType(Array( StructField("device_id", StringType), StructField("device_type", StringType), StructField("platform", StringType), StructField("model", StringType), StructField("country", StringType), StructField("sdk_version", StringType), StructField("brand", StringType), StructField("install_list", StringType), StructField("version", IntegerType) )) spark.createDataFrame(cal_rdd.union(Key2000047_rdd), install_schema).filter("device_id!=''").createOrReplaceTempView("adn_sdk_tab") spark.udf.register("merge", merge _) //去重复 val install_re = spark.sql("select device_id,device_type,platform,max(model) model,max(brand) brand,max(country) country,max(sdk_version) sdk_version,merge(collect_set(install_list)) install_list, version from adn_sdk_tab group by device_id,device_type,platform,version") FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output_path), true) install_re.repartition(100).write.format("orc").option("orc.compress", "zlib").mode("overwrite").save(output_path) // install_df.repartition(100).write.format("orc").option("orc.compress", "zlib").mode("overwrite").save(output_path) } finally { if (spark != null) { spark.sparkContext.stop() } } } def merge(installList: mutable.WrappedArray[String]): String = { val installJSONArray = new JSONArray() installList.iterator.foreach(install => { installJSONArray.fluentAddAll(String2JSONArray(install)) }) installJSONArray.toJSONString } def String2JSONArray(str: String): JSONArray = { val jsonArray = if (null != str && !("" == str)) { try { val element = JSON.parseArray(str) if (!element.isEmpty) { element } else { new JSONArray() } } catch { case _: Exception => new JSONArray() } } else { new JSONArray() } jsonArray } def parseCalData(row: Row, log_day: String, stringToStringMapBC: Broadcast[collection.Map[String, String]]) = { val data = row.getString(3) val sdk_version = row.getString(6) val model = row(4) val package_array = new JSONArray() val device_type = row(1) var version = 0 // version = 0 表示默认,1 表示 key=2000079 try { if (data.indexOf("cal=") > -1) { val a = data.substring(data.indexOf("cal=")) val cal = a.substring(4, a.length()).split("&")(0) val receive_day = log_day.substring(0, 4) + "-" + log_day.substring(4, 6) + "-" + log_day.substring(6, 8) var install_array = JSON.parseArray(AdnSdkTools.evaluate(cal)) if (install_array != null) { if (data.contains("key=2000079")) { for (a <- 0 to install_array.size() - 1) { breakable { var obj = install_array.getJSONObject(a) var label = "" if (obj.get("label") != null) { label = obj.get("label").toString() } var campaignid = "" var package_name = "" if (obj.get("campaignid") != null) { campaignid = obj.get("campaignid").toString() val maybeString: Option[String] = stringToStringMapBC.value.get(campaignid) if (maybeString.isDefined) { package_name = maybeString.get } else { break } } else { break } val install_json = new JSONObject() if (obj.get("ts") != null && obj.getString("ts").nonEmpty && obj.getString("ts").length() > 4) { val ts = obj.getLong("ts") val time_format = new SimpleDateFormat("yyyy-MM-dd") val date = new Date() date.setTime(ts) val ins_day = time_format.format(date) val n = Integer.parseInt(ins_day.substring(0, 4)) val f = Integer.parseInt(log_day.substring(0, 4)) if (f - n > 2) { install_json.put("date", receive_day) } else { install_json.put("date", ins_day) } } else { install_json.put("date", receive_day) } var package_namejson = package_name if (StringUtils.isNotBlank(label) && sdk_version>="MAL_14") { if ("ni".equalsIgnoreCase(label)) { package_namejson = package_name + ".notinstall" } if ("if".equalsIgnoreCase(label)) { package_namejson = package_name + ".delete" } } install_json.put("package_name", package_namejson) install_json.put("label", label) install_json.put("campaignid", campaignid) var vc = "" if (obj.get("vc") != null && obj.getString("vc").nonEmpty) { vc = obj.get("vc").toString() } install_json.put("vc", vc) package_array.add(install_json) } } version = 1 } else { for (a <- 0 to install_array.size() - 1) { var obj = install_array.getJSONObject(a) val package_name = obj.get("packageName").toString() var label = "" if (obj.get("label") != null) { label = obj.get("label").toString() } var campaignid = "" var iex = 0 if (obj.get("iex") != null) { iex = obj.getInteger("iex") } if (obj.get("campaignid") != null) { campaignid = obj.get("campaignid").toString() } if (iex == 1) { val install_json = new JSONObject() if (obj.get("ts") != null && obj.getString("ts").nonEmpty && obj.getString("ts").length() > 4) { val ts = obj.getLong("ts") val time_format = new SimpleDateFormat("yyyy-MM-dd") val date = new Date() date.setTime(ts) val ins_day = time_format.format(date) val n = Integer.parseInt(ins_day.substring(0, 4)) val f = Integer.parseInt(log_day.substring(0, 4)) if (f - n > 2) { install_json.put("date", receive_day) } else { install_json.put("date", ins_day) } } else { install_json.put("date", receive_day) } var package_namejson = package_name if (StringUtils.isNotBlank(label) && sdk_version>="MAL_14") { if ("ni".equalsIgnoreCase(label)) { package_namejson = package_name + ".notinstall" } if ("if".equalsIgnoreCase(label)) { package_namejson = package_name + ".delete" } } install_json.put("package_name", package_namejson) install_json.put("label", label) install_json.put("campaignid", campaignid) var vc = "" if (obj.get("vc") != null && obj.getString("vc").nonEmpty) { vc = obj.get("vc").toString() } install_json.put("vc", vc) package_array.add(install_json) } } version = 0 } } } } catch { case _: com.alibaba.fastjson.JSONException => {} case e: Exception => { e.printStackTrace() } // e.printStackTrace() } finally { } Row( row(0), device_type, row(2), model, row(5), row(6), row(7), package_array.toString, version ) } def parseKey2000047Data(row: Row, log_day: String) = { // t1.device_id,t1.device_type,t1.platform,t1.data,t1.model,t1.country,t1.sdk_version,t1.brand,t3.id,t3.packagename val device_type = row(1) val model = row(4) val sdk_version = row.getString(6) val campaignid = row.getString(8) val package_name = row.getString(9) val package_array = new JSONArray() var version = 1 // version = 0 表示默认 入adn_sdk 分区,1 表示 key=2000079 或者 key=2000047 入adn_sdk_v2分区 try { val receive_day = log_day.substring(0, 4) + "-" + log_day.substring(4, 6) + "-" + log_day.substring(6, 8) var label = "" val install_json = new JSONObject() install_json.put("date", receive_day) var package_namejson = package_name if (StringUtils.isNotBlank(label) && sdk_version>="MAL_14") { if ("ni".equalsIgnoreCase(label)) { package_namejson = package_name + ".notinstall" } if ("if".equalsIgnoreCase(label)) { package_namejson = package_name + ".delete" } } install_json.put("package_name", package_namejson) install_json.put("label", label) install_json.put("campaignid", campaignid) var vc = "" install_json.put("vc", vc) package_array.add(install_json) version = 1 } catch { case e: Exception => { e.printStackTrace() } } finally { } Row( row(0), device_type, row(2), model, row(5), row(6), row(7), package_array.toString, version ) } } case class campaignId_packageName(campaignId: String, packageName: String)