package mobvista.dmp.datasource.dsp import java.net.URI import java.util import java.util.regex.Pattern import com.fasterxml.jackson.databind.ObjectMapper import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.datasource.dsp.mapreduce.SegmentVO import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, SparkSession} import scala.collection.JavaConversions._ import scala.collection.mutable class DspEtlHour extends CommonSparkJob with Serializable { private val IDFA = 37 private val GAID = 34 private val PKG_NAME = 20 private val PLATFORM = 29 private val UPDATE_TIME = 0 private val IP = 26 private val MAKER = 27 private val MODEL = 28 private val OS_VERSION = 30 private val COUNTRY_CODE = 33 private val BIRTHDAY = 39 private val GENDER = 40 private val EXT_ID = 15 private val JSON_MSG = 6 private val iosPkgPtn = Pattern.compile("^\\d+$") private val adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$") private val mapper = new ObjectMapper() private var packageMap: Broadcast[scala.collection.Map[String, Int]] = null def commandOptions(): Options = { val options = new Options() options.addOption("detailOutPath", true, "detailOutPath") options.addOption("input", true, "input") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val coalesce = commandLine.getOptionValue("coalesce") val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val detailOutput = commandLine.getOptionValue("detailOutPath") // .appName("DspEtlDaily") val spark = SparkSession .builder() .config("spark.rdd.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { var package_sql = """ |SHOW PARTITIONS dwh.package_mapping """.stripMargin var partDF = spark.sql(package_sql) val package_dt = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1) package_sql = s""" |SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}' """.stripMargin packageMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => { (r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString)) }).collectAsMap()) // val regionSet = Set("cn", "tokyo", "virginia") // regionSet.foreach(region => { FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(detailOutput), true) import spark.implicits._ // On cluster, schema must be specified manually. val df = spark.read.schema(dspSchema).orc(input) .filter(_.size >= 52) .filter(filterData _) .rdd .map(parseMapData) .toDF // .persist(StorageLevel.MEMORY_AND_DISK_SER) /* val mds_df = df.select("deviceId", "deviceType", "platform", "time", "ip", "geoInfo", "longitude", "latitude") .toDF("device_id", "device_type", "platform", "req_time", "ip", "geo", "longitude", "latitude") mds_df.coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(detailOutput) */ val mergeUdf = udf((segmentArrays: mutable.WrappedArray[String]) => { val segmentSet: util.Set[String] = new util.HashSet[String]() for (segmentArray <- segmentArrays) { val jsonArray = GsonUtil.String2JsonArray(segmentArray.toString) for (json <- jsonArray) { segmentSet.add(json.toString) } } "[" + segmentSet.mkString(",") + "]" }) val agg_df = df.groupBy("deviceId") .agg(first("deviceType"), first("platform"), first("country"), first("ip"), first("gender"), first("birthday"), first("maker"), first("model"), first("osVersion"), collect_set("packageName"), collect_set("androidId"), max("time"), mergeUdf(collect_set("segment")).alias("segment_ids") ).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker", "model", "os_version", "package_list", "androidids", "datetime", "segment_ids") agg_df.write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } def parseMapData(row: Row): DspReqVO = { val idfa = row.getString(IDFA) val gaid = row.getString(GAID) var packageName = row.getString(PKG_NAME) val platform = row.getString(PLATFORM) val time = row.getString(UPDATE_TIME) val ip = row.getString(IP) val maker = row.getString(MAKER) val model = row.getString(MODEL) val osVersion = row.getString(OS_VERSION) val country = row.getString(COUNTRY_CODE) val birthday = row.getString(BIRTHDAY) val gender = row.getString(GENDER) val exitId = row.getString(EXT_ID) val jsonMsg = row.getString(JSON_MSG) if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id\\\\d+$")) { packageName = packageName.replaceAll("id", "") } // val packageId = packageMap.value.getOrElse(packageName.toLowerCase(), 0) // val packageId = packageName var deviceId = "" var deviceType = "" if ("ios".equals(platform)) { deviceId = idfa deviceType = "idfa" } else if ("android".equals(platform)) { deviceId = gaid deviceType = "gaid" } val androidId = splitFun(exitId, ",")(5) var geoInfo = "" var longitude = "" var latitude = "" var segment = "" // 处理jsonMsg,获取geo属性值 if (jsonMsg.startsWith("{")) { val json = GsonUtil.String2JsonObject(jsonMsg) val element = json.get("device") if (element != null && !element.isJsonNull) { val geoElement = element.getAsJsonObject.get("geo") if (geoElement != null && !geoElement.isJsonNull) { geoInfo = geoElement.toString val geoJson = geoElement.getAsJsonObject val lonElement = geoJson.get("lon") if (lonElement != null && !lonElement.isJsonNull) { longitude = lonElement.toString } val latElement = geoJson.get("lat") if (latElement != null && !latElement.isJsonNull) { latitude = latElement.toString } } } // 获取segment信息 val userElement = json.get("user") if (userElement != null && !userElement.isJsonNull) { val dataElement = userElement.getAsJsonObject.get("data") if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray .foreach(dataEle => { val segElement = dataEle.getAsJsonObject.get("segment") if (segElement != null && !segElement.isJsonNull && segElement.toString.startsWith("[") && segElement.toString.endsWith("]")) { segment = segElement.toString } }) } } DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, geoInfo, longitude, latitude, segment) } def parseData(rows: Iterator[Row]): Iterator[DspReqVO] = { val res = new util.ArrayList[DspReqVO]() while (rows.hasNext) { val row = rows.next val idfa = row.getString(IDFA) val gaid = row.getString(GAID) var packageName = row.getString(PKG_NAME) val platform = row.getString(PLATFORM) val time = row.getString(UPDATE_TIME) val ip = row.getString(IP) val maker = row.getString(MAKER) val model = row.getString(MODEL) val osVersion = row.getString(OS_VERSION) val country = row.getString(COUNTRY_CODE) val birthday = row.getString(BIRTHDAY) val gender = row.getString(GENDER) val exitId = row.getString(EXT_ID) val jsonMsg = row.getString(JSON_MSG) if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id\\\\d+$")) { packageName = packageName.replaceAll("id", "") } var deviceId = "" var deviceType = "" if ("ios".equals(platform)) { deviceId = idfa deviceType = "idfa" } else if ("android".equals(platform)) { deviceId = gaid deviceType = "gaid" } val androidId = splitFun(exitId, ",")(5) var geoInfo = "" var longitude = "" var latitude = "" // var segment = "" val segmentMap: mutable.HashMap[String, SegmentVO] = new mutable.HashMap[String, SegmentVO]() //处理jsonMsg,获取geo属性值 if (jsonMsg.startsWith("{")) { val json = GsonUtil.String2JsonObject(jsonMsg) val element = json.get("device") if (element != null && !element.isJsonNull) { val geoElement = element.getAsJsonObject.get("geo") if (geoElement != null && !geoElement.isJsonNull) { geoInfo = geoElement.toString val geoJson = geoElement.getAsJsonObject val lonElement = geoJson.get("lon") if (lonElement != null && !lonElement.isJsonNull) { longitude = lonElement.toString } val latElement = geoJson.get("lat") if (latElement != null && !latElement.isJsonNull) { latitude = latElement.toString } } } // 获取segment信息 val userElement = json.get("user") if (userElement != null && !userElement.isJsonNull) { val dataElement = userElement.getAsJsonObject.get("data") if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray .foreach(dataEle => { val segElement = dataEle.getAsJsonObject.get("segment") if (segElement != null && !segElement.isJsonNull) { val segmentVO = GsonUtil.fromJson(segElement, classOf[SegmentVO]) // segment = segElement.toString // segmentMap.put(segmentVO.getId, segmentVO) } }) } } res.add(DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, "", androidId, time, geoInfo, longitude, latitude, "")) } import scala.collection.JavaConverters._ res.asScala.iterator } override protected def buildOptions(): Options = { val options = new Options options.addOption("input", true, "[must] input") options.addOption("output", true, "[must] output") options.addOption("detailOutPath", true, "[must] detailOutPath") // options.addOption("coalesce", true, "[must] coalesce") options } def filterData(row: Row): Boolean = { val idfa = row.getString(IDFA) val gaid = row.getString(GAID) var packageName = row.getString(PKG_NAME) val platform = row.getString(PLATFORM) val extId = row.getString(EXT_ID) if (!"ios".equals(platform) && !"android".equals(platform)) { return false } var deviceId = "" if ("ios".equals(platform) && StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !allZero.equals(idfa)) { if (packageName.matches("^id\\d+$") || packageName.matches("^\\d+$")) { packageName = packageName.replace("id", "") if (checkPkgName("ios", packageName) && packageName.length > 8) { deviceId = idfa } } } else if ("android".equals(platform) && StringUtils.isNotBlank(gaid) && gaid.matches(didPtn) && !allZero.equals(gaid) && checkPkgName("android", packageName)) { deviceId = gaid } if (StringUtils.isBlank(deviceId)) { return false } if (splitFun(extId, ",").length <= 5) { return false } true } private def checkPkgName(platform: String, pkg: String) = platform match { case "ios" => iosPkgPtn.matcher(pkg).matches || adrPkgPtn.matcher(pkg).matches case "android" => adrPkgPtn.matcher(pkg).matches case _ => false } def schema_age_gender: StructType = { StructType(StructField("device_id", StringType) :: StructField("tag", StringType) :: StructField("label", StringType) :: StructField("business", StringType) :: StructField("device_type", StringType) :: Nil) } def dspSchema: StructType = { StructType(StructField("time", StringType) :: StructField("xforwardip", StringType) :: StructField("ip", StringType) :: StructField("exchanges", StringType) :: StructField("elapsed", StringType) :: StructField("url", StringType) :: StructField("body", StringType) :: StructField("requestid", StringType) :: StructField("bid", StringType) :: StructField("price", StringType) :: StructField("describe", StringType) :: StructField("ext1", StringType) :: StructField("ext2", StringType) :: StructField("ext3", StringType) :: StructField("ext4", StringType) :: StructField("ext5", StringType) :: StructField("auctiontype", StringType) :: StructField("bidreqid", StringType) :: StructField("impid", StringType) :: StructField("publisherid", StringType) :: StructField("appid", StringType) :: StructField("appname", StringType) :: StructField("posid", StringType) :: StructField("category", StringType) :: StructField("intl", StringType) :: StructField("imagesize", StringType) :: StructField("deviceip", StringType) :: StructField("make", StringType) :: StructField("model", StringType) :: StructField("os", StringType) :: StructField("osv", StringType) :: StructField("devicetype", StringType) :: StructField("cncttype", StringType) :: StructField("countrycode", StringType) :: StructField("googleadid", StringType) :: StructField("imeishal", StringType) :: StructField("androididmd5", StringType) :: StructField("idfa", StringType) :: StructField("keywords", StringType) :: StructField("yob", StringType) :: StructField("gender", StringType) :: StructField("ext6", StringType) :: StructField("ext7", StringType) :: StructField("ext8", StringType) :: StructField("ext9", StringType) :: StructField("ext10", StringType) :: StructField("campaignid", StringType) :: StructField("cinstallprice", StringType) :: StructField("cappname", StringType) :: StructField("cpackagename", StringType) :: StructField("cadvertiserid", StringType) :: StructField("ccreativeid", StringType) :: Nil) } } object DspEtlHour { def main(args: Array[String]): Unit = { new DspEtlHour().run(args) } }