From 4822000774cde2729452697302dbb72cf7b1706b Mon Sep 17 00:00:00 2001 From: fan.jiang <fan.jiang@mobvista.com> Date: Mon, 23 Aug 2021 18:16:11 +0800 Subject: [PATCH] dsp_req分区数据人群包临时产出s3,供产品使用 --- azkaban/dsp/tmp_extract_data_from_dsp_req.job | 2 ++ azkaban/dsp/tmp_extract_data_from_dsp_req.sh | 32 ++++++++++++++++++++++++++++++++ src/main/scala/mobvista/dmp/datasource/dsp/TmpExtractDataFromDspReq.scala | 377 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 411 insertions(+) create mode 100644 azkaban/dsp/tmp_extract_data_from_dsp_req.job create mode 100644 azkaban/dsp/tmp_extract_data_from_dsp_req.sh create mode 100644 src/main/scala/mobvista/dmp/datasource/dsp/TmpExtractDataFromDspReq.scala diff --git a/azkaban/dsp/tmp_extract_data_from_dsp_req.job b/azkaban/dsp/tmp_extract_data_from_dsp_req.job new file mode 100644 index 0000000..98cf447 --- /dev/null +++ b/azkaban/dsp/tmp_extract_data_from_dsp_req.job @@ -0,0 +1,2 @@ +type=command +command=sh -x ./tmp_extract_data_from_dsp_req.sh \ No newline at end of file diff --git a/azkaban/dsp/tmp_extract_data_from_dsp_req.sh b/azkaban/dsp/tmp_extract_data_from_dsp_req.sh new file mode 100644 index 0000000..cd13c99 --- /dev/null +++ b/azkaban/dsp/tmp_extract_data_from_dsp_req.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +source ../dmp_env.sh + +ScheduleTime=${ScheduleTime:-$1} +LOG_TIME=$(date -d "$ScheduleTime 1 days ago" "+%Y-%m-%d") +dt=$(date -d "$ScheduleTime 1 days ago" "+%Y%m%d") +date_path=$(date -d "$ScheduleTime 1 days ago" "+%Y/%m/%d") +old_path=$(date -d "$ScheduleTime 2 days ago" "+%Y/%m/%d") + +rm_dt=$(date -d "$ScheduleTime 180 days ago" "+%Y%m%d") +rm_dt_path=$(date -d "$ScheduleTime 180 days ago" "+%Y/%m/%d") + + +Tmp_Extract_Data_From_DspReq_Path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_tmp_extract_data_from_dspReq_path" +ETL_DSP_REQ_ETL_HOURS_INPUT_PATH="${ETL_DSP_REQ_ETL_HOURS}/$date_path/*/*" +check_await "${ETL_DSP_REQ_ETL_HOURS}/$date_path/23/_SUCCESS" + +hadoop fs -rm -r ${Tmp_Extract_Data_From_DspReq_Path} + +spark-submit --class mobvista.dmp.datasource.dsp.TmpExtractDataFromDspReq \ + --conf spark.yarn.executor.memoryOverhead=3072 \ + --conf spark.sql.shuffle.partitions=10000 \ + --files ${HIVE_SITE_PATH} \ + --master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 4 --num-executors 100 \ + ../${JAR} -input $ETL_DSP_REQ_ETL_HOURS_INPUT_PATH \ + -output ${Tmp_Extract_Data_From_DspReq_Path} \ + -coalesce 200 || exit 1 + + + + diff --git a/src/main/scala/mobvista/dmp/datasource/dsp/TmpExtractDataFromDspReq.scala b/src/main/scala/mobvista/dmp/datasource/dsp/TmpExtractDataFromDspReq.scala new file mode 100644 index 0000000..f2d33cd --- /dev/null +++ b/src/main/scala/mobvista/dmp/datasource/dsp/TmpExtractDataFromDspReq.scala @@ -0,0 +1,377 @@ +package mobvista.dmp.datasource.dsp + +import com.fasterxml.jackson.databind.ObjectMapper +import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} +import mobvista.dmp.datasource.dsp.DspConstant.Value +import mobvista.dmp.datasource.dsp.mapreduce.SegmentVO +import mobvista.dmp.format.TextMultipleOutputFormat +import mobvista.dmp.util.MD5Util +import mobvista.prd.datasource.util.GsonUtil +import org.apache.commons.cli.{BasicParser, Options} +import org.apache.commons.lang.StringUtils +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.functions.{collect_set, max, udf} +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +import java.net.URI +import java.util +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +/** + * @author jiangfan + * @date 2021/8/23 15:56 + */ +class TmpExtractDataFromDspReq extends CommonSparkJob with Serializable { + override protected def buildOptions(): Options = { + val options = new Options + options.addOption("input", true, "input") + options.addOption("output", true, "output") + options.addOption("coalesce", true, "coalesce") + options + } + + def buildRes( row: Row): Array[Tuple2[Text, Text]] = { + val buffer = new ArrayBuffer[Tuple2[Text, Text]]() + val device_id = row.getAs[String]("device_id") + val device_type = row.getAs[String]("device_type") + if (StringUtils.isNotBlank(device_type)) { + buffer += Tuple2(new Text(s"${device_type}, "), new Text(device_id)) + } + buffer.toArray + } + + override protected def run(args: Array[String]): Int = { + val commandLine = commParser.parse(options, args) + if (!checkMustOption(commandLine)) { + printUsage(options) + return -1 + } else printOptions(commandLine) + + val coalesce = commandLine.getOptionValue("coalesce") + val input = commandLine.getOptionValue("input") + val output = commandLine.getOptionValue("output") + + val spark = SparkSession.builder() + .appName("TmpExtractDataFromDspReq") + .config("spark.rdd.compress", "true") + .config("spark.io.compression.codec", "snappy") + .config("spark.sql.orc.filterPushdown", "true") + .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .enableHiveSupport() + .getOrCreate() + val sc = spark.sparkContext + + + FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) + + + import spark.implicits._ + + try { + val midData = spark.read.schema(dspEtlSchema).orc(input).flatMap(parseMapData(_)).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker", + "model", "os_version", "packageName", "androidids", "datetime", "segment", "region") + + midData.createOrReplaceTempView("tmp_data") + + var sql5= + s""" + |select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_bes' + """.stripMargin + spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) + .saveAsNewAPIHadoopFile(s"${output}/100203", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) + + sql5= + s""" + |select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppoziyou' + """.stripMargin + spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) + .saveAsNewAPIHadoopFile(s"${output}/4059", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) + + sql5= + s""" + |select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppoziyou_notinstall' + """.stripMargin + spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) + .saveAsNewAPIHadoopFile(s"${output}/4060", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) + + sql5= + s""" + |select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppoziyou_hist_notinstall' + """.stripMargin + spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) + .saveAsNewAPIHadoopFile(s"${output}/4061", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) + + sql5= + s""" + |select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppolianmeng' + """.stripMargin + spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) + .saveAsNewAPIHadoopFile(s"${output}/4053", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) + + sql5= + s""" + |select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppolianmeng_hist1year_notinstall' + """.stripMargin + spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) + .saveAsNewAPIHadoopFile(s"${output}/4054", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) + + sql5= + s""" + |select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppolianmeng_histhalfyear_notinstall' + """.stripMargin + spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) + .saveAsNewAPIHadoopFile(s"${output}/4055", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) + + + } finally { + if (spark != null) { + spark.stop() + } + } + 0 + } + + + def parseMapData(row: Row): Array[DspReqVOoptimization] = { + var arrayBuffer = new ArrayBuffer[DspReqVOoptimization]() + + // val idfa = row.getAs[String]("idfa") + var idfa = "" + val idfaTmp = row.getAs[String]("idfa") + if (StringUtils.isNotBlank(idfaTmp) && idfaTmp.matches(didPtn)) idfa = idfaTmp + + // val gaid = row.getAs[String]("gaid") + var gaid = "" + val gaidTmp = row.getAs[String]("gaid") + if (StringUtils.isNotBlank(gaidTmp) && gaidTmp.matches(didPtn)) gaid = gaidTmp + + + val platform = row.getAs[String]("platform") + val country = row.getAs[String]("country") + val ip = row.getAs[String]("ip") + val gender = row.getAs[String]("gender") + val birthday = row.getAs[String]("birthday") + val maker = row.getAs[String]("maker") + val model = row.getAs[String]("model") + val osVersion = row.getAs[String]("osVersion") + var packageName = "" + if (StringUtils.isNotBlank(row.getAs[String]("packageName"))) { + packageName = row.getAs[String]("packageName") + } + + val exitId = row.getAs[String]("exitId") + val time = row.getAs[String]("datetime") + var segment = "" + if (StringUtils.isNotBlank(row.getAs[String]("segment"))) { + segment = row.getAs[String]("segment") + } + val region = row.getAs[String]("region") + + var androidId = "" + var imei = "" + var imeimd5 = "" + // 添加oaid oaidmd5 解析 + var oaid = "" + var oaidmd5 = "" + + // 新增 + var idfv = "" + var gaidmd5 = "" + + if (StringUtils.isNotBlank(exitId)) { + val devIds = splitFun(exitId, ",") + if (devIds.length >= 17) { + if ("ios".equalsIgnoreCase(platform)) { + if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) { + idfa = devIds(1) + } + if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) { + idfv = devIds(16) + } + } else { + if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) { + gaid = devIds(0) + } + if (StringUtils.isNotBlank(devIds(2)) && devIds(2).matches(MobvistaConstant.md5Ptn)) { + gaidmd5 = devIds(2) + } + if (StringUtils.isNotBlank(devIds(12))) { + oaid = devIds(12) + } + if (StringUtils.isNotBlank(devIds(13)) && devIds(13).matches(MobvistaConstant.md5Ptn)) { + oaidmd5 = devIds(13) + } + if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) { + imei = devIds(4) + } + if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(MobvistaConstant.md5Ptn)) { + imeimd5 = devIds(5) + } + if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) { + androidId = devIds(7) + } + } + } + + + } + + val value = Value(country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region) + + var deviceType = "" + var deviceId = "" + + + if ("ios".equals(platform) && idfa.length > 4 && !allZero.equals(idfa)) { + deviceId = idfa + deviceType = "idfa" + } else if ("android".equals(platform) && gaid.length > 4 && !allZero.equals(gaid)) { + deviceId = gaid + deviceType = "gaid" + } + + val exchanges = row.getAs[String]("exchanges") + val dealerid = row.getAs[String]("dealerid") + val dealeridArray: Array[String] = dealerid.split(",") + if ("oppocn".equals(exchanges)) { + if (dealeridArray.contains("4059")) { + value.packageName = "com.ss.android.ugc.aweme" + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + value.packageName = "com.ss.android.ugc.aweme_oppoziyou" + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + } + if (dealeridArray.contains("4060")) { + value.packageName = "com.ss.android.ugc.aweme_oppoziyou_notinstall" + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + } + if (dealeridArray.contains("4061")) { + value.packageName = "com.ss.android.ugc.aweme_oppoziyou_hist_notinstall" + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + } + if (dealeridArray.contains("4053")) { + value.packageName = "com.ss.android.ugc.aweme" + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + value.packageName = "com.ss.android.ugc.aweme_oppolianmeng" + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + } + if (dealeridArray.contains("4054")) { + value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_hist1year_notinstall" + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + } + if (dealeridArray.contains("4055")) { + value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_histhalfyear_notinstall" + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + } + } + + val mapData_bes = Map( + 100203 -> "com.ss.android.ugc.aweme") + if ("bes".equals(exchanges)) { + for (item <- mapData_bes) { + if (dealeridArray.contains(item._1.toString)) { + value.packageName = item._2 + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + value.packageName = item._2+"_bes" + arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value) + } + } + } + arrayBuffer.toArray + } + + + def addDatasV2(arrayBuffer: ArrayBuffer[DspReqVOoptimization], deviceId: String, gaidmd5: String, imei: String, imeimd5: String, + oaid: String, oaidmd5: String, androidId: String, idfv: String, deviceType: String, platform: String, value: Value + ): ArrayBuffer[DspReqVOoptimization] = { + var gaidFlag = true + if (StringUtils.isNotBlank(deviceId)) { + arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform, + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + if ("android".equalsIgnoreCase(platform) && "gaid".equalsIgnoreCase(deviceType)) { + val gaidmd5 = MD5Util.getMD5Str(deviceId) + gaidFlag = false + arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + } + } + if (StringUtils.isNotBlank(gaidmd5) && gaidFlag) { + arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + } + if (StringUtils.isNotBlank(deviceId)) { + arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform, + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + } + if (StringUtils.isNotBlank(androidId)) { + arrayBuffer += DspReqVOoptimization(androidId, "androidid", "android", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + } + if (StringUtils.isNotBlank(idfv)) { + arrayBuffer += DspReqVOoptimization(idfv, "idfv", "ios", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + } + var imeiFlag = true + if (StringUtils.isNotBlank(imei)) { + arrayBuffer += DspReqVOoptimization(imei, "imei", "android", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + val imeimd5 = MD5Util.getMD5Str(imei) + imeiFlag = false + arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + } + if (StringUtils.isNotBlank(imeimd5) && imeiFlag) { + arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + } + var oaidFlag = true + if (StringUtils.isNotBlank(oaid)) { + arrayBuffer += DspReqVOoptimization(oaid, "oaid", "android", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + val oaidmd5 = MD5Util.getMD5Str(oaid) + oaidFlag = false + arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + + } + if (StringUtils.isNotBlank(oaidmd5) && oaidFlag) { + arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android", + value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region) + } + arrayBuffer + } + + + def dspEtlSchema: StructType = { + StructType(StructField("idfa", StringType) :: + StructField("gaid", StringType) :: + StructField("platform", StringType) :: + StructField("country", StringType) :: + StructField("ip", StringType) :: + StructField("gender", StringType) :: + StructField("birthday", StringType) :: + StructField("maker", StringType) :: + StructField("model", StringType) :: + StructField("osVersion", StringType) :: + StructField("packageName", StringType) :: + StructField("exitId", StringType) :: + StructField("datetime", StringType) :: + StructField("segment", StringType) :: + StructField("dealerid", StringType) :: + StructField("exchanges", StringType) :: + StructField("region", StringType) + :: Nil) + } + +} + + +object TmpExtractDataFromDspReq { + def main(args: Array[String]): Unit = { + new TmpExtractDataFromDspReq().run(args) + } +} -- libgit2 0.27.1