Commit 48220007 by fan.jiang

dsp_req分区数据人群包临时产出s3,供产品使用

parent 3fe9507c
type=command
command=sh -x ./tmp_extract_data_from_dsp_req.sh
\ No newline at end of file
#!/bin/bash
source ../dmp_env.sh
ScheduleTime=${ScheduleTime:-$1}
LOG_TIME=$(date -d "$ScheduleTime 1 days ago" "+%Y-%m-%d")
dt=$(date -d "$ScheduleTime 1 days ago" "+%Y%m%d")
date_path=$(date -d "$ScheduleTime 1 days ago" "+%Y/%m/%d")
old_path=$(date -d "$ScheduleTime 2 days ago" "+%Y/%m/%d")
rm_dt=$(date -d "$ScheduleTime 180 days ago" "+%Y%m%d")
rm_dt_path=$(date -d "$ScheduleTime 180 days ago" "+%Y/%m/%d")
Tmp_Extract_Data_From_DspReq_Path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_tmp_extract_data_from_dspReq_path"
ETL_DSP_REQ_ETL_HOURS_INPUT_PATH="${ETL_DSP_REQ_ETL_HOURS}/$date_path/*/*"
check_await "${ETL_DSP_REQ_ETL_HOURS}/$date_path/23/_SUCCESS"
hadoop fs -rm -r ${Tmp_Extract_Data_From_DspReq_Path}
spark-submit --class mobvista.dmp.datasource.dsp.TmpExtractDataFromDspReq \
--conf spark.yarn.executor.memoryOverhead=3072 \
--conf spark.sql.shuffle.partitions=10000 \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 4 --num-executors 100 \
../${JAR} -input $ETL_DSP_REQ_ETL_HOURS_INPUT_PATH \
-output ${Tmp_Extract_Data_From_DspReq_Path} \
-coalesce 200 || exit 1
package mobvista.dmp.datasource.dsp
import com.fasterxml.jackson.databind.ObjectMapper
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.dsp.DspConstant.Value
import mobvista.dmp.datasource.dsp.mapreduce.SegmentVO
import mobvista.dmp.format.TextMultipleOutputFormat
import mobvista.dmp.util.MD5Util
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.{collect_set, max, udf}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import java.net.URI
import java.util
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* @author jiangfan
* @date 2021/8/23 15:56
*/
class TmpExtractDataFromDspReq extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("input", true, "input")
options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce")
options
}
def buildRes( row: Row): Array[Tuple2[Text, Text]] = {
val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
val device_id = row.getAs[String]("device_id")
val device_type = row.getAs[String]("device_type")
if (StringUtils.isNotBlank(device_type)) {
buffer += Tuple2(new Text(s"${device_type}, "), new Text(device_id))
}
buffer.toArray
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val input = commandLine.getOptionValue("input")
val output = commandLine.getOptionValue("output")
val spark = SparkSession.builder()
.appName("TmpExtractDataFromDspReq")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
import spark.implicits._
try {
val midData = spark.read.schema(dspEtlSchema).orc(input).flatMap(parseMapData(_)).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker",
"model", "os_version", "packageName", "androidids", "datetime", "segment", "region")
midData.createOrReplaceTempView("tmp_data")
var sql5=
s"""
|select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_bes'
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output}/100203", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
sql5=
s"""
|select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppoziyou'
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output}/4059", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
sql5=
s"""
|select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppoziyou_notinstall'
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output}/4060", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
sql5=
s"""
|select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppoziyou_hist_notinstall'
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output}/4061", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
sql5=
s"""
|select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppolianmeng'
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output}/4053", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
sql5=
s"""
|select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppolianmeng_hist1year_notinstall'
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output}/4054", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
sql5=
s"""
|select device_id, device_type from tmp_data where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppolianmeng_histhalfyear_notinstall'
""".stripMargin
spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output}/4055", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
} finally {
if (spark != null) {
spark.stop()
}
}
0
}
def parseMapData(row: Row): Array[DspReqVOoptimization] = {
var arrayBuffer = new ArrayBuffer[DspReqVOoptimization]()
// val idfa = row.getAs[String]("idfa")
var idfa = ""
val idfaTmp = row.getAs[String]("idfa")
if (StringUtils.isNotBlank(idfaTmp) && idfaTmp.matches(didPtn)) idfa = idfaTmp
// val gaid = row.getAs[String]("gaid")
var gaid = ""
val gaidTmp = row.getAs[String]("gaid")
if (StringUtils.isNotBlank(gaidTmp) && gaidTmp.matches(didPtn)) gaid = gaidTmp
val platform = row.getAs[String]("platform")
val country = row.getAs[String]("country")
val ip = row.getAs[String]("ip")
val gender = row.getAs[String]("gender")
val birthday = row.getAs[String]("birthday")
val maker = row.getAs[String]("maker")
val model = row.getAs[String]("model")
val osVersion = row.getAs[String]("osVersion")
var packageName = ""
if (StringUtils.isNotBlank(row.getAs[String]("packageName"))) {
packageName = row.getAs[String]("packageName")
}
val exitId = row.getAs[String]("exitId")
val time = row.getAs[String]("datetime")
var segment = ""
if (StringUtils.isNotBlank(row.getAs[String]("segment"))) {
segment = row.getAs[String]("segment")
}
val region = row.getAs[String]("region")
var androidId = ""
var imei = ""
var imeimd5 = ""
// 添加oaid oaidmd5 解析
var oaid = ""
var oaidmd5 = ""
// 新增
var idfv = ""
var gaidmd5 = ""
if (StringUtils.isNotBlank(exitId)) {
val devIds = splitFun(exitId, ",")
if (devIds.length >= 17) {
if ("ios".equalsIgnoreCase(platform)) {
if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) {
idfa = devIds(1)
}
if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) {
idfv = devIds(16)
}
} else {
if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) {
gaid = devIds(0)
}
if (StringUtils.isNotBlank(devIds(2)) && devIds(2).matches(MobvistaConstant.md5Ptn)) {
gaidmd5 = devIds(2)
}
if (StringUtils.isNotBlank(devIds(12))) {
oaid = devIds(12)
}
if (StringUtils.isNotBlank(devIds(13)) && devIds(13).matches(MobvistaConstant.md5Ptn)) {
oaidmd5 = devIds(13)
}
if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) {
imei = devIds(4)
}
if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(MobvistaConstant.md5Ptn)) {
imeimd5 = devIds(5)
}
if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) {
androidId = devIds(7)
}
}
}
}
val value = Value(country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region)
var deviceType = ""
var deviceId = ""
if ("ios".equals(platform) && idfa.length > 4 && !allZero.equals(idfa)) {
deviceId = idfa
deviceType = "idfa"
} else if ("android".equals(platform) && gaid.length > 4 && !allZero.equals(gaid)) {
deviceId = gaid
deviceType = "gaid"
}
val exchanges = row.getAs[String]("exchanges")
val dealerid = row.getAs[String]("dealerid")
val dealeridArray: Array[String] = dealerid.split(",")
if ("oppocn".equals(exchanges)) {
if (dealeridArray.contains("4059")) {
value.packageName = "com.ss.android.ugc.aweme"
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
value.packageName = "com.ss.android.ugc.aweme_oppoziyou"
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
}
if (dealeridArray.contains("4060")) {
value.packageName = "com.ss.android.ugc.aweme_oppoziyou_notinstall"
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
}
if (dealeridArray.contains("4061")) {
value.packageName = "com.ss.android.ugc.aweme_oppoziyou_hist_notinstall"
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
}
if (dealeridArray.contains("4053")) {
value.packageName = "com.ss.android.ugc.aweme"
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
value.packageName = "com.ss.android.ugc.aweme_oppolianmeng"
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
}
if (dealeridArray.contains("4054")) {
value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_hist1year_notinstall"
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
}
if (dealeridArray.contains("4055")) {
value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_histhalfyear_notinstall"
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
}
}
val mapData_bes = Map(
100203 -> "com.ss.android.ugc.aweme")
if ("bes".equals(exchanges)) {
for (item <- mapData_bes) {
if (dealeridArray.contains(item._1.toString)) {
value.packageName = item._2
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
value.packageName = item._2+"_bes"
arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
}
}
}
arrayBuffer.toArray
}
def addDatasV2(arrayBuffer: ArrayBuffer[DspReqVOoptimization], deviceId: String, gaidmd5: String, imei: String, imeimd5: String,
oaid: String, oaidmd5: String, androidId: String, idfv: String, deviceType: String, platform: String, value: Value
): ArrayBuffer[DspReqVOoptimization] = {
var gaidFlag = true
if (StringUtils.isNotBlank(deviceId)) {
arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform,
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
if ("android".equalsIgnoreCase(platform) && "gaid".equalsIgnoreCase(deviceType)) {
val gaidmd5 = MD5Util.getMD5Str(deviceId)
gaidFlag = false
arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
}
}
if (StringUtils.isNotBlank(gaidmd5) && gaidFlag) {
arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
}
if (StringUtils.isNotBlank(deviceId)) {
arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform,
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
}
if (StringUtils.isNotBlank(androidId)) {
arrayBuffer += DspReqVOoptimization(androidId, "androidid", "android",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
}
if (StringUtils.isNotBlank(idfv)) {
arrayBuffer += DspReqVOoptimization(idfv, "idfv", "ios",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
}
var imeiFlag = true
if (StringUtils.isNotBlank(imei)) {
arrayBuffer += DspReqVOoptimization(imei, "imei", "android",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
val imeimd5 = MD5Util.getMD5Str(imei)
imeiFlag = false
arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
}
if (StringUtils.isNotBlank(imeimd5) && imeiFlag) {
arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
}
var oaidFlag = true
if (StringUtils.isNotBlank(oaid)) {
arrayBuffer += DspReqVOoptimization(oaid, "oaid", "android",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
val oaidmd5 = MD5Util.getMD5Str(oaid)
oaidFlag = false
arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
}
if (StringUtils.isNotBlank(oaidmd5) && oaidFlag) {
arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android",
value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
}
arrayBuffer
}
def dspEtlSchema: StructType = {
StructType(StructField("idfa", StringType) ::
StructField("gaid", StringType) ::
StructField("platform", StringType) ::
StructField("country", StringType) ::
StructField("ip", StringType) ::
StructField("gender", StringType) ::
StructField("birthday", StringType) ::
StructField("maker", StringType) ::
StructField("model", StringType) ::
StructField("osVersion", StringType) ::
StructField("packageName", StringType) ::
StructField("exitId", StringType) ::
StructField("datetime", StringType) ::
StructField("segment", StringType) ::
StructField("dealerid", StringType) ::
StructField("exchanges", StringType) ::
StructField("region", StringType)
:: Nil)
}
}
object TmpExtractDataFromDspReq {
def main(args: Array[String]): Unit = {
new TmpExtractDataFromDspReq().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment