package mobvista.dmp.datasource.dsp import mobvista.dmp.common.{CommonMapReduce, CommonSparkJob} import mobvista.dmp.util.{MD5Util, MRUtils} import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel import java.net.URI import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("yyyymmddhh", true, "yyyymmddhh") options.addOption("output", true, "output") options.addOption("outputmds", true, "outputmds") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val coalesce = commandLine.getOptionValue("coalesce") val yyyymmddhh = commandLine.getOptionValue("yyyymmddhh") val output = commandLine.getOptionValue("output") val outputmds = commandLine.getOptionValue("outputmds") val spark = SparkSession .builder() .config("spark.rdd.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputmds), true) import spark.implicits._ val sql = s""" select idfa,googleadid gaid, | os platform,countrycode country, | deviceip ip,gender,yob birthday, | make,model,osv osversion,ext6 ua, | appid packagename,ext3,exchanges,ext5 exitid,`time`, | body json_msg,rg region | from adn_dsp.log_adn_dsp_request_orc_hour where concat(yr,mt,dt,hh)='${yyyymmddhh}' """.stripMargin val df = spark.sql(sql).filter(filterData _) .rdd .map(parseMapData).toDF() .persist(StorageLevel.MEMORY_AND_DISK_SER) df.select("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osVersion", "ua", "packageName", "exitId", "time", "geoInfo", "longitude", "latitude", "segment", "dealerid", "exchanges", "region") .toDF("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osversion", "ua", "packagename", "exitid", "time", "geoinfo", "longitude", "latitude", "segment", "dealerid", "exchanges", "region") .createOrReplaceTempView("dsp_org_etl_hours") val etl_sql = """ |select idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion,ua,concat_ws('#',collect_set(packagename)) packagename,exitid,max(`time`) datetime,segment,dealerid,exchanges,concat_ws('#',collect_set(region)) region |from dsp_org_etl_hours |group by idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion,ua,exitid,segment,dealerid,exchanges """.stripMargin spark.sql(etl_sql).repartition(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) df.select("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osVersion", "packageName", "exitId", "time", "geoInfo", "longitude", "latitude", "segment", "region") .toDF("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osversion", "packagename", "exitid", "time", "geoinfo", "longitude", "latitude", "segment", "region") .createOrReplaceTempView("dsp_org_mds_hours") val mds_sql = """ |select idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion, concat_ws('#',collect_set(packagename)) packagename,exitid,`time`,geoinfo,longitude, latitude,segment,concat_ws('#',collect_set(region)) region |from dsp_org_mds_hours |group by idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion,exitid,`time`,geoinfo,longitude, latitude,segment """.stripMargin val midData = spark.sql(mds_sql).rdd.flatMap(parseMapDataMDS).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker", "model", "os_version", "package_list", "androidids", "datetime", "geoInfo", "longitude", "latitude", "segment") midData.orderBy("device_id", "device_type").rdd.map(line => { MRUtils.JOINER.join(line.getAs[String]("device_id"), line.getAs[String]("device_type"), line.getAs[String]("platform"), line.getAs[String]("datetime"), line.getAs[String]("ip"), line.getAs[String]("geoInfo"), line.getAs[String]("longitude"), line.getAs[String]("latitude") ) }).coalesce(coalesce.toInt).saveAsTextFile(outputmds, classOf[GzipCodec]) } finally { if (spark != null) { spark.stop() } } 0 } def parseMapData(row: Row): DspReqVODemo = { val idfa = row.getAs[String]("idfa") val gaid = row.getAs[String]("gaid") var packageName = "" if (StringUtils.isNotBlank(row.getAs[String]("packagename"))) { packageName = row.getAs[String]("packagename") } val platform = row.getAs[String]("platform") val time = row.getAs[String]("time") val ip = row.getAs[String]("ip") val maker = row.getAs[String]("make") val model = row.getAs[String]("model") val osVersion = row.getAs[String]("osversion") val ua = row.getAs[String]("ua") val country = row.getAs[String]("country") val birthday = row.getAs[String]("birthday") val gender = row.getAs[String]("gender") val exitId = row.getAs[String]("exitid") val exchanges = row.getAs[String]("exchanges") var dealerid = "" if("mopub".equalsIgnoreCase(exchanges) || "oppocn".equalsIgnoreCase(exchanges) || "inmobi".equalsIgnoreCase(exchanges) || "bes".equalsIgnoreCase(exchanges) || "iqiyi".equalsIgnoreCase(exchanges) || "vivo".equalsIgnoreCase(exchanges) ){ val ext3 = row.getAs[String]("ext3") if(StringUtils.isNotBlank(ext3) && ext3.startsWith("{")){ try{ val testObj = GsonUtil.String2JsonObject(ext3) val dealids = testObj.get("dealids") if(dealids != null && !dealids.isJsonNull){ dealerid = dealids.getAsString }else{ val ruleids = testObj.get("ruleids") if(ruleids != null && !ruleids.isJsonNull){ dealerid = ruleids.getAsString } } } catch { case e: Exception =>{e.printStackTrace()} } } } val jsonMsg = row.getAs[String]("json_msg") val region = row.getAs[String]("region") var geoInfo = "" var longitude = "" var latitude = "" var segment = "" // 处理jsonMsg,获取geo属性值 if (jsonMsg.startsWith("{")) { val json = GsonUtil.String2JsonObject(jsonMsg) val element = json.get("device") if (element != null && !element.isJsonNull) { val geoElement = element.getAsJsonObject.get("geo") if (geoElement != null && !geoElement.isJsonNull) { geoInfo = geoElement.toString val geoJson = geoElement.getAsJsonObject val lonElement = geoJson.get("lon") if (lonElement != null && !lonElement.isJsonNull) { longitude = lonElement.toString } val latElement = geoJson.get("lat") if (latElement != null && !latElement.isJsonNull) { latitude = latElement.toString } } } // 获取segment信息 val userElement = json.get("user") if (userElement != null && !userElement.isJsonNull) { val dataElement = userElement.getAsJsonObject.get("data") if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray .foreach(dataEle => { val segElement = dataEle.getAsJsonObject.get("segment") if (segElement != null && !segElement.isJsonNull && segElement.toString.startsWith("[") && segElement.toString.endsWith("]")) { segment = segElement.toString } }) } } DspReqVODemo(idfa, gaid, platform, country, ip, gender, birthday, maker, model, osVersion, ua, packageName, exitId, time, geoInfo, longitude, latitude, segment, dealerid, exchanges, region) } override protected def buildOptions(): Options = { val options = new Options options.addOption("input", true, "[must] input") options.addOption("output", true, "[must] output") options.addOption("detailOutPath", true, "[must] detailOutPath") options.addOption("coalesce", true, "[must] coalesce") options } def filterData(row: Row): Boolean = { val idfa = row.getAs[String]("idfa") val gaid = row.getAs[String]("gaid") var packageName = row.getAs[String]("packagename") val platform = row.getAs[String]("platform") val exitId = row.getAs[String]("exitid") val country = row.getAs[String]("country") if (!"ios".equals(platform) && !"android".equals(platform)) { return false } var deviceId = "" if ("ios".equals(platform) && StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !allZero.equals(idfa)) { deviceId = idfa } else if ("android".equals(platform) && StringUtils.isNotBlank(gaid) && gaid.matches(didPtn) && !allZero.equals(gaid)) { deviceId = gaid } var androidId ="" var imei ="" var imeimd5 = "" if(StringUtils.isNotBlank(exitId)){ val devIds = splitFun(exitId, ",") if(devIds.length>= 8 ){ if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(CommonMapReduce.andriodIdPtn)) { androidId = devIds(7) } if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(CommonMapReduce.imeiPtn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imei = devIds(4) } if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(CommonMapReduce.imeiMd5Ptn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imeimd5 = devIds(5) } } } if (StringUtils.isBlank(deviceId) && StringUtils.isBlank(imei) && StringUtils.isBlank(imeimd5) && StringUtils.isBlank(androidId)) { return false } true } def parseMapDataMDS(row: Row): Array[DspReqVO] = { val arrayBuffer = new ArrayBuffer[DspReqVO]() var idfa = "" val idfaTmp = row.getAs[String]("idfa") if(StringUtils.isNotBlank(idfaTmp) && idfaTmp.matches(didPtn)) idfa = idfaTmp var gaid = "" val gaidTmp = row.getAs[String]("gaid") if(StringUtils.isNotBlank(gaidTmp) && gaidTmp.matches(didPtn)) gaid = gaidTmp val platform = row.getAs[String]("platform") val country = row.getAs[String]("country") val ip = row.getAs[String]("ip") val gender = row.getAs[String]("gender") val birthday = row.getAs[String]("birthday") val maker = row.getAs[String]("maker") val model = row.getAs[String]("model") val osVersion = row.getAs[String]("osversion") var packageName = "" if(StringUtils.isNotBlank(row.getAs[String]("packagename"))){ packageName = row.getAs[String]("packagename") } val exitId = row.getAs[String]("exitid") val time = row.getAs[String]("time") val geoInfo = row.getAs[String]("geoinfo") val longitude = row.getAs[String]("longitude") val latitude = row.getAs[String]("latitude") var segment = "" if(StringUtils.isNotBlank(row.getAs[String]("segment"))){ segment = row.getAs[String]("segment") } var androidId = "" var imei = "" var imeimd5 = "" if(StringUtils.isNotBlank(exitId)){ val devIds = splitFun(exitId, ",") if(devIds.length>= 8 ){ if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(CommonMapReduce.andriodIdPtn)) { androidId = devIds(7) } if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(CommonMapReduce.imeiPtn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imei = devIds(4) } if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(CommonMapReduce.imeiMd5Ptn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) { imeimd5 = devIds(5) } } } var deviceType = "" var deviceId = "" if ("ios".equals(platform) && idfa.length > 4 && !allZero.equals(idfa)) { deviceId = idfa deviceType = "idfa" } else if ("android".equals(platform) && gaid.length > 4 && !allZero.equals(gaid)) { deviceId = gaid deviceType = "gaid" } if (!deviceId.isEmpty) { arrayBuffer += DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, geoInfo, longitude, latitude, segment) } if (!androidId.isEmpty) { arrayBuffer += DspReqVO(androidId, "androidid", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, geoInfo, longitude, latitude, segment) } if (StringUtils.isNotBlank(imei)) { arrayBuffer += DspReqVO(imei, "imei", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, geoInfo, longitude, latitude, segment) val imeimd5 = MD5Util.getMD5Str(imei) arrayBuffer += DspReqVO(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, geoInfo, longitude, latitude, segment) } if (StringUtils.isNotBlank(imeimd5)) { arrayBuffer += DspReqVO(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, geoInfo, longitude, latitude, segment) } arrayBuffer.toArray } def dspSchema: StructType = { StructType(StructField("time", StringType) :: StructField("xforwardip", StringType) :: StructField("ip", StringType) :: StructField("exchanges", StringType) :: StructField("elapsed", StringType) :: StructField("url", StringType) :: StructField("body", StringType) :: StructField("requestid", StringType) :: StructField("bid", StringType) :: StructField("price", StringType) :: StructField("describe", StringType) :: StructField("ext1", StringType) :: StructField("ext2", StringType) :: StructField("ext3", StringType) :: StructField("ext4", StringType) :: StructField("ext5", StringType) :: StructField("auctiontype", StringType) :: StructField("bidreqid", StringType) :: StructField("impid", StringType) :: StructField("publisherid", StringType) :: StructField("appid", StringType) :: StructField("appname", StringType) :: StructField("posid", StringType) :: StructField("category", StringType) :: StructField("intl", StringType) :: StructField("imagesize", StringType) :: StructField("deviceip", StringType) :: StructField("make", StringType) :: StructField("model", StringType) :: StructField("os", StringType) :: StructField("osv", StringType) :: StructField("devicetype", StringType) :: StructField("cncttype", StringType) :: StructField("countrycode", StringType) :: StructField("googleadid", StringType) :: StructField("imeishal", StringType) :: StructField("androididmd5", StringType) :: StructField("idfa", StringType) :: StructField("keywords", StringType) :: StructField("yob", StringType) :: StructField("gender", StringType) :: StructField("ext6", StringType) :: StructField("ext7", StringType) :: StructField("ext8", StringType) :: StructField("ext9", StringType) :: StructField("ext10", StringType) :: StructField("campaignid", StringType) :: StructField("cinstallprice", StringType) :: StructField("cappname", StringType) :: StructField("cpackagename", StringType) :: StructField("cadvertiserid", StringType) :: StructField("ccreativeid", StringType) :: Nil) } } object DspOrgLogEtlHoursDemo { def main(args: Array[String]): Unit = { new DspOrgLogEtlHoursDemo().run(args) } }