Commit 3e38d98c by WangJinfeng

fix dmp bug

parent 2c3dc69a
......@@ -435,6 +435,8 @@ RUID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/etl/ruid_mapping
# dsp_device_mapping
DSP_DEVICE_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dsp/device_mapping"
ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwd/dwd_device_ids_inc_daily"
JAR=./DMP.jar
# 检查_SUCCESS文件,若不存在则循环检测
......
type=command
command=sh -x adn_request_id_mapping.sh
\ No newline at end of file
#! /bin/bash
source ../dmp_env.sh
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
date_path=$(date +'%Y/%m/%d' -d "-1 day $ScheduleTime")
# INPUT_PATH=${ETL_ADN_ORG_REQ_HOURS}/${date_path}
INPUT_PATH=${ADN_REQUEST_PATH}/${date_path}
check_await "${INPUT_PATH}/virginia/23/_SUCCESS"
BUSINESS="adn_request"
OUTPUT_PATH=${ID_MAPPING}/${date_path}/${BUSINESS}
spark-submit --class mobvista.dmp.datasource.id_mapping.AdnRequest \
--name "EtlDeviceIdDaily.$BUSINESS.$LOG_TIME" \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.default.parallelism=3000 \
--conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 200 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 1000
if [ $? -ne 0 ]; then
exit 255
fi
common_mount_partition "dwd" "dwd_device_ios_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/ios"
common_mount_partition "dwd" "dwd_device_android_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/android"
type=command
command=sh -x dsp_id_mapping.sh
\ No newline at end of file
#! /bin/bash
source ../dmp_env.sh
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
date_path=$(date +'%Y/%m/%d' -d "-1 day $ScheduleTime")
# INPUT_PATH=${ETL_DSP_REQ_ETL_HOURS}/${date_path}
INPUT_PATH=${ADN_DSP_PATH}/${date_path}
check_await "${INPUT_PATH}/virginia/23/_SUCCESS"
BUSINESS="dsp_req"
OUTPUT_PATH=${ID_MAPPING}/${date_path}/${BUSINESS}
spark-submit --class mobvista.dmp.datasource.id_mapping.DspReq \
--name "EtlDeviceIdDaily.$BUSINESS.$LOG_TIME" \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.default.parallelism=3000 \
--conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 200 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 1000
if [ $? -ne 0 ]; then
exit 255
fi
common_mount_partition "dwd" "dwd_device_ios_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/ios"
common_mount_partition "dwd" "dwd_device_android_ids_inc_daily" "dt='${LOG_TIME}',active_type='${BUSINESS}'" "${OUTPUT_PATH}/android"
type=command
dependencies=adn_request_id_mapping,dsp_id_mapping
command=echo "ID Mapping End!"
\ No newline at end of file
package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.datasource.adn.AdnConstant
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
/**
* @package: mobvista.dmp.datasource.id_mapping
* @author: wangjf
* @date: 2021/11/30
* @time: 7:51 下午
* @email: jinfeng.wang@mobvista.com
*/
class AdnRequest extends EtlDeviceIdDaily {
override def processData(date: String, spark: SparkSession): RDD[(String, Row)] = {
spark.udf.register("getDevId", AdnConstant.getDevId _)
// DWD
// val sql = Constant.adn_request_sql.replace("@date", date)
// ODS
val sql = Constant.adn_request_sql_v2.replace("@date", date)
val rdd = spark.sql(sql).coalesce(60000).rdd.map(row => {
val gaid = row.getAs[String]("gaid")
val idfa = row.getAs[String]("idfa")
val imei = row.getAs[String]("imei")
val androidId = row.getAs[String]("androidid")
val extSysid = row.getAs[String]("extsysid")
val platform = row.getAs[String]("platform").toLowerCase
val oaid = row.getAs[String]("oaid")
val idfv = row.getAs[String]("idfv")
val country = row.getAs[String]("country")
val arr = extSysid.split(",", -1)
val sysId = if (StringUtils.isNotBlank(arr(0)) &&
arr(0).matches(MobvistaConstant.didPtn) &&
!arr(0).matches(MobvistaConstant.allZero)) {
arr(0)
} else {
""
}
val bkupId = if (arr.length == 2 && StringUtils.isNotBlank(arr(1)) &&
arr(1).matches(MobvistaConstant.didPtn) &&
!arr(1).matches(MobvistaConstant.allZero)) {
arr(1)
} else {
""
}
val rw = if ("ios".equalsIgnoreCase(platform)) {
Row(idfa, idfv, sysId, bkupId, "", "", country)
} else {
Row(imei, androidId, oaid, gaid, sysId, bkupId, "", "", country)
}
(platform, rw)
})
rdd
}
}
object AdnRequest {
def main(args: Array[String]): Unit = {
new AdnRequest().run(args)
}
}
\ No newline at end of file
package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.common.MobvistaConstant
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* @package: mobvista.dmp.datasource.id_mapping
* @author: wangjf
* @date: 2021/11/30
* @time: 3:02 下午
* @email: jinfeng.wang@mobvista.com
*/
object Constant {
case class IosTab(idfa: String, idfv: String, sysid: String, bkupid: String, xwho: String, user_id: String)
extends java.io.Serializable
val iosSchema: StructType = StructType(Array(
StructField("idfa", StringType),
StructField("idfv", StringType),
StructField("sysid", StringType),
StructField("bkupid", StringType),
StructField("xwho", StringType),
StructField("user_id", StringType),
StructField("country", StringType)
))
case class AdrTab(imei: String, android_id: String, oaid: String, gaid: String, sysid: String, bkupid: String, xwho: String, user_id: String)
extends java.io.Serializable
val adrSchema: StructType = StructType(Array(
StructField("imei", StringType),
StructField("android_id", StringType),
StructField("oaid", StringType),
StructField("gaid", StringType),
StructField("sysid", StringType),
StructField("bkupid", StringType),
StructField("xwho", StringType),
StructField("user_id", StringType),
StructField("country", StringType)
))
val dsp_req_sql =
"""
|SELECT idfa, gaid, exitid, platform
| FROM dwh.etl_dsp_request_daily_hours
| WHERE dt = '@date'
|""".stripMargin
val dsp_req_sql_v2 =
"""
|SELECT idfa, googleadid gaid, ext5 exitid, os platform, countrycode country
| FROM adn_dsp.log_adn_dsp_request_orc_hour
| WHERE CONCAT(yr,mt,dt) = '@date'
| GROUP BY idfa, googleadid, ext5, os, countrycode
|""".stripMargin
val adn_request_sql =
"""
|SELECT idfa, gaid, oaid, idfv, androidid, extsysid, imei, platform
| FROM dwh.etl_adn_org_request_daily_hours
| WHERE CONCAT(yt,mt,dt) = '@date'
|""".stripMargin
val adn_request_sql_v2 =
"""
|SELECT idfa, gaid, oaid, getDevId(cdn_ab) idfv, dev_id androidid,
| ext_sysid extsysid, imei, platform, country_code country
| FROM dwh.ods_adn_trackingnew_request
| WHERE CONCAT(yyyy,mm,dd) = '@date'
| GROUP BY idfa, gaid, oaid, cdn_ab, dev_id, ext_sysid, imei, platform, country_code
|""".stripMargin
val sss_sql =
"""
|""".stripMargin
def getDevId(devId: String): String = {
var dev_id = ""
if (StringUtils.isNotBlank(devId) && !",".equals(devId)) {
val ids = devId.split(",", -1)
if (StringUtils.isNotBlank(ids(0)) && ids(0).matches(MobvistaConstant.didPtn) && !ids(0).matches(MobvistaConstant.allZero)) {
dev_id = ids(0)
} else if (ids.length == 2 && !ids(1).matches(MobvistaConstant.didPtn) && !ids(1).matches(MobvistaConstant.allZero)) {
dev_id = ids(1)
}
}
dev_id
}
}
package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.common.MobvistaConstant
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
/**
* @package: mobvista.dmp.datasource.id_mapping
* @author: wangjf
* @date: 2021/11/30
* @time: 7:51 下午
* @email: jinfeng.wang@mobvista.com
*/
class DspReq extends EtlDeviceIdDaily {
override def processData(date: String, spark: SparkSession): RDD[(String, Row)] = {
// DWD
// val sql = Constant.dsp_req_sql.replace("@date", date)
// ODS
val sql = Constant.dsp_req_sql_v2.replace("@date", date)
val rdd = spark.sql(sql).coalesce(60000).rdd.map(row => {
var idfa = row.getAs[String]("idfa")
var gaid = row.getAs[String]("gaid")
val platform = row.getAs[String]("platform")
val exitId = row.getAs[String]("exitid")
val country = row.getAs[String]("country").toUpperCase()
var idfv = ""
var oaid = ""
var imei = ""
var androidId = ""
if (StringUtils.isNotBlank(exitId)) {
val devIds = splitFun(exitId, ",")
if (devIds.length >= 17) {
if ("ios".equalsIgnoreCase(platform)) {
if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) {
idfa = devIds(1)
}
if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) {
idfv = devIds(16)
}
} else {
if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) {
gaid = devIds(0)
}
if (StringUtils.isNotBlank(devIds(12))) {
oaid = devIds(12)
}
if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) {
imei = devIds(4)
}
if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) {
androidId = devIds(7)
}
}
}
}
val rw = if ("ios".equalsIgnoreCase(platform)) {
Row(idfa, idfv, "", "", "", "", country)
} else {
Row(imei, androidId, oaid, gaid, "", "", "", "", country)
}
(platform, rw)
})
rdd
}
}
object DspReq {
def main(args: Array[String]): Unit = {
new DspReq().run(args)
}
}
\ No newline at end of file
package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
/**
* @package: mobvista.dmp.datasource.id_mapping
* @author: wangjf
* @date: 2021/11/30
* @time: 10:35 上午
* @email: jinfeng.wang@mobvista.com
*/
abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("business", true, "business")
options.addOption("date", true, "date")
options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val business = commandLine.getOptionValue("business")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"EtlDeviceIdDaily.$business.$date")
try {
val df = processData(date, spark).distinct(20000)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
val iosTab = df.filter(plf => {
"ios".equals(plf._1)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + "/ios"), true)
spark.createDataFrame(iosTab, iosSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + "/ios")
val adrTab = df.filter(plf => {
"android".equals(plf._1)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + "/android"), true)
spark.createDataFrame(adrTab, adrSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + "/android")
} finally {
if (spark != null) {
spark.stop()
}
}
0
}
def processData(date: String, spark: SparkSession): RDD[(String, Row)]
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment