Commit 1dac1a5f by WangJinfeng

init id_mapping

parent 1cc7ff0d
......@@ -437,6 +437,8 @@ DSP_DEVICE_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dsp/device
ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwd/dwd_device_ids_inc_daily"
ADS_DEVICE_ID_MAPPING="s3://mob-emr-test/dataplatform/DataWareHouse/data/ads/ads_device_id_mapping"
JAR=./DMP.jar
# 检查_SUCCESS文件,若不存在则循环检测
......
......@@ -19,11 +19,10 @@ spark-submit --class mobvista.dmp.datasource.id_mapping.AdnRequest \
--name "EtlDeviceIdDaily.$BUSINESS.$LOG_TIME" \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.default.parallelism=3000 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 8g --executor-cores 5 --num-executors 200 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 1000
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=5000 \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 8g --executor-cores 5 --num-executors 200 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 2000
if [ $? -ne 0 ]; then
exit 255
......
......@@ -19,11 +19,10 @@ spark-submit --class mobvista.dmp.datasource.id_mapping.DspReq \
--name "EtlDeviceIdDaily.$BUSINESS.$LOG_TIME" \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=5000 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 12g --executor-cores 5 --num-executors 200 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 1000
--conf spark.sql.shuffle.partitions=10000 \
--conf spark.default.parallelism=10000 \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 8g --executor-cores 5 --num-executors 200 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 2000
if [ $? -ne 0 ]; then
exit 255
......
type=command
command=echo "ID Mapping End!"
\ No newline at end of file
command=sh -x id_mapping.sh
\ No newline at end of file
#! /bin/bash
source ../dmp_env.sh
LOG_TIME=$(date +%Y-%m-%d -d "-1 day $ScheduleTime")
date_path=$(date +'%Y/%m/%d' -d "-1 day $ScheduleTime")
ADN_REQUEST_INPUT_PATH=${ID_MAPPING}/${date_path}/adn_request
DSP_INPUT_PATH=${ID_MAPPING}/${date_path}/dsp_req
check_await "${ADN_REQUEST_INPUT_PATH}/_SUCCESS"
check_await "${DSP_INPUT_PATH}/_SUCCESS"
OUTPUT_PATH=${ADS_DEVICE_ID_MAPPING}/${date_path}
country="US"
platform="ios"
spark-submit --class mobvista.dmp.datasource.id_mapping.IDMappingGraphx \
--name "IDMappingGraphx.${LOG_TIME}.${country}.${platform}" \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=5000 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 8g --executor-cores 5 --num-executors 100 \
../${JAR} -date ${LOG_TIME} -country ${country} -platform ${platform} -output ${OUTPUT_PATH} -coalesce 500
if [ $? -ne 0 ]; then
exit 255
fi
\ No newline at end of file
......@@ -92,6 +92,11 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
......@@ -126,6 +131,11 @@
<artifactId>orc-mapreduce</artifactId>
<version>${orc.version}</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
</dependencies>
</dependencyManagement>
......@@ -215,6 +225,10 @@
<artifactId>spark-yarn_2.12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
......@@ -293,6 +307,15 @@
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.437</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.12</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
<build>
......
......@@ -101,9 +101,10 @@ ali_activation.package_name=com.taobao.foractivation.172393,com.taobao.foractiva
com.taobao.foractivation.260951,com.taobao.foractivation.260951_oaid,202005260951,com.taobao.foractivation.261865,com.taobao.foractivation.261865_oaid,\
202005261865
dsp_req.package_name=com.taobao.taobao_oppo,com.eg.android.AlipayGphone_oppo,com.ucmobile_oppo,com.qiyi.video_oppo,com.taobao.taobao_notinstall_oppo,\
# 2021-12-08 14:03:45 ?? com.taobao.idlefish_oppo,com.qiyi.video_oppo
dsp_req.package_name=com.taobao.taobao_oppo,com.eg.android.AlipayGphone_oppo,com.ucmobile_oppo,com.taobao.taobao_notinstall_oppo,\
com.eg.android.AlipayGphone_bes,com.youku.phone_notinstall_oppo,com.sankuai.meituan_oppo,com.meituan.itakeaway_oppo,com.taobao.idlefish_bes,\
com.taobao.idlefish_oppo,com.UCMobile_bes,com.taobao.taobao_bes,com.tencent.news_fromtencent,com.taobao.taobao_iqiyi,com.taobao.taobao,com.UCMobile_iqiyi,\
com.UCMobile_bes,com.taobao.taobao_bes,com.tencent.news_fromtencent,com.taobao.taobao_iqiyi,com.taobao.taobao,com.UCMobile_iqiyi,\
com.UCMobile,com.eg.android.AlipayGphone_iqiyi,com.eg.android.AlipayGphone,com.taobao.idlefish_iqiyi,com.taobao.idlefish,com.sankuai.meituan_iqiyi,com.sankuai.meituan,\
com.tencent.news_iqiyi,com.tencent.news,134037632320210617,44094811020210617,147250281920210617,com.kuaishou.nebula_fromkuaishou,com.smile.gifmaker_fromkuaishou,\
com.sankuai.meituan_bes
......
......@@ -321,7 +321,7 @@ object MobvistaConstant {
// 全0
val allZero = "00000000-0000-0000-0000-000000000000"
// IMEI
val imeiPtn = "^([0-9]{15,17})$"
val imeiPtn = "^([0-9]{14,17})$"
// 14~16位连续多位相同字符,非法IMEI过滤
val imeiPtnAll = """^([0-9])\1{14,16}"""
// androidId
......@@ -334,6 +334,8 @@ object MobvistaConstant {
val umd5Ptn = """^([0-9A-Za-z])\1{29,31}"""
// OAID
val oaidPtb = """^[0-9A-Za-z-]{16,64}$"""
// IP
val ipPtn = """^(25[0-5]|2[0-4][0-9]|[0-1]?[0-9]?[0-9])(\.(25[0-5]|2[0-4][0-9]|[0-1]?[0-9]?[0-9])){3}$"""
def checkDeviceId(device_id: String): Boolean = {
StringUtils.isNotBlank(device_id) &&
......
package mobvista.dmp.datasource.dsp
import java.net.URI
import mobvista.dmp.common.{CommonMapReduce, CommonSparkJob}
import mobvista.dmp.util.{MD5Util, MRUtils}
import mobvista.prd.datasource.util.GsonUtil
......@@ -13,6 +11,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
......@@ -59,7 +58,7 @@ class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable {
select idfa,googleadid gaid,
| os platform,countrycode country,
| deviceip ip,gender,yob birthday,
| make,model,osv osversion,
| make,model,osv osversion,ext6 ua,
| appid packagename,ext3,exchanges,ext5 exitid,`time`,
| body json_msg,rg region
| from adn_dsp.log_adn_dsp_request_orc_hour where concat(yr,mt,dt,hh)='${yyyymmddhh}'
......@@ -69,15 +68,15 @@ class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable {
.map(parseMapData).toDF()
.persist(StorageLevel.MEMORY_AND_DISK_SER)
df.select("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osVersion", "packageName", "exitId", "time", "geoInfo", "longitude", "latitude", "segment", "dealerid", "exchanges", "region")
.toDF("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osversion", "packagename", "exitid", "time", "geoinfo", "longitude", "latitude", "segment", "dealerid", "exchanges", "region")
df.select("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osVersion", "ua", "packageName", "exitId", "time", "geoInfo", "longitude", "latitude", "segment", "dealerid", "exchanges", "region")
.toDF("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osversion", "ua", "packagename", "exitid", "time", "geoinfo", "longitude", "latitude", "segment", "dealerid", "exchanges", "region")
.createOrReplaceTempView("dsp_org_etl_hours")
val etl_sql =
"""
|select idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion, concat_ws('#',collect_set(packagename)) packagename,exitid,max(`time`) datetime,segment,dealerid,exchanges,concat_ws('#',collect_set(region)) region
|select idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion,ua,concat_ws('#',collect_set(packagename)) packagename,exitid,max(`time`) datetime,segment,dealerid,exchanges,concat_ws('#',collect_set(region)) region
|from dsp_org_etl_hours
|group by idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion,exitid,segment,dealerid,exchanges
|group by idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion,ua,exitid,segment,dealerid,exchanges
""".stripMargin
spark.sql(etl_sql).repartition(coalesce.toInt)
.write
......@@ -135,6 +134,7 @@ class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable {
val maker = row.getAs[String]("make")
val model = row.getAs[String]("model")
val osVersion = row.getAs[String]("osversion")
val ua = row.getAs[String]("ua")
val country = row.getAs[String]("country")
val birthday = row.getAs[String]("birthday")
val gender = row.getAs[String]("gender")
......@@ -203,9 +203,8 @@ class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable {
}
DspReqVODemo(idfa, gaid, platform, country, ip, gender, birthday, maker, model, osVersion,
DspReqVODemo(idfa, gaid, platform, country, ip, gender, birthday, maker, model, osVersion, ua,
packageName, exitId, time, geoInfo, longitude, latitude, segment, dealerid, exchanges, region)
}
override protected def buildOptions(): Options = {
......
package mobvista.dmp.datasource.dsp
case class DspReqVODemo(idfa:String,gaid: String,
platform: String,
country: String,
ip: String,
gender: String,
birthday: String,
maker: String,
model: String,
osVersion: String,
packageName: String,
exitId: String,
time: String,
geoInfo: String,
longitude: String,
latitude: String,
segment: String,
dealerid:String,
exchanges:String,
case class DspReqVODemo(idfa: String,
gaid: String,
platform: String,
country: String,
ip: String,
gender: String,
birthday: String,
maker: String,
model: String,
osVersion: String,
ua: String,
packageName: String,
exitId: String,
time: String,
geoInfo: String,
longitude: String,
latitude: String,
segment: String,
dealerid: String,
exchanges: String,
region: String)
package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.datasource.id_mapping.Constant.{getDevId, parseUA}
import mobvista.dmp.datasource.id_mapping.Constant.{getDevId, parseUA, process}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
......@@ -20,22 +20,18 @@ class AdnRequest extends EtlDeviceIdDaily {
// DWD
// val sql = Constant.adn_request_sql.replace("@date", date)
// ODS
val sql = Constant.adn_request_sql_v2.replace("@date", date)
val rdd = spark.sql(sql).coalesce(2000).rdd.map(row => {
val sql = Constant.adn_request_sql_v3.replace("@date", date)
val rdd = spark.sql(sql).coalesce(5000).rdd.map(row => {
val gaid = row.getAs[String]("gaid")
val idfa = row.getAs[String]("idfa")
val imei = row.getAs[String]("imei")
val androidId = row.getAs[String]("androidid")
val extSysid = row.getAs[String]("extsysid")
var platform = row.getAs[String]("platform")
platform = if (StringUtils.isNotBlank(platform)) {
platform.toLowerCase()
} else {
""
}
val platform = row.getAs[String]("platform")
val oaid = row.getAs[String]("oaid")
val idfv = row.getAs[String]("idfv")
var country = row.getAs[String]("country")
val pkg_name = row.getAs[String]("pkg_name")
val country = row.getAs[String]("country")
val ip = row.getAs[String]("ip")
val ua = row.getAs[String]("ua")
val brand = row.getAs[String]("brand")
......@@ -43,11 +39,7 @@ class AdnRequest extends EtlDeviceIdDaily {
val os_version = row.getAs[String]("os_version")
val osv_upt = row.getAs[String]("osv_upt")
val upt = row.getAs[String]("upt")
country = if (StringUtils.isNotBlank(country)) {
country.toUpperCase()
} else {
""
}
val network_type = row.getAs[String]("network_type")
var sysId = ""
var bkupId = ""
if (StringUtils.isNotBlank(extSysid)) {
......@@ -57,19 +49,14 @@ class AdnRequest extends EtlDeviceIdDaily {
} else {
""
}
bkupId = if (arr.length == 2 && StringUtils.isNotBlank(arr(1))) {
arr(1)
} else {
""
}
}
val rw = if ("ios".equalsIgnoreCase(platform)) {
Row(idfa, idfv, sysId, bkupId, "", "", country, ip, ua, brand, model, os_version, osv_upt, upt)
} else {
Row(imei, androidId, oaid, gaid, sysId, bkupId, "", "", country, ip, ua, brand, model, os_version, osv_upt, upt)
}
(platform, rw)
process(idfa, idfv, pkg_name, imei, androidId, oaid, gaid, sysId, bkupId, country, ip, ua, brand, model, os_version,
osv_upt, upt, network_type, platform)
})
rdd
}
......
package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.datasource.id_mapping.Constant.parseUA
import mobvista.dmp.datasource.id_mapping.Constant.{parseUA, process}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
......@@ -34,12 +34,14 @@ class DspReq extends EtlDeviceIdDaily {
}
val sql = Constant.dsp_req_sql_v2.replace("@date", date)
.replace("@hour", hour)
val rdd = spark.sql(sql).coalesce(2000).rdd.map(row => {
val rdd = spark.sql(sql).coalesce(20000).rdd.map(row => {
var idfa = row.getAs[String]("idfa")
var gaid = row.getAs[String]("gaid")
val platform = row.getAs[String]("platform")
val exitId = row.getAs[String]("exitid")
var country = row.getAs[String]("country")
val pkg_name = row.getAs[String]("pkg_name")
val network_type = row.getAs[String]("network_type")
country = if (StringUtils.isNotBlank(country)) {
country.toUpperCase()
} else {
......@@ -83,12 +85,8 @@ class DspReq extends EtlDeviceIdDaily {
}
}
val rw = if ("ios".equalsIgnoreCase(platform)) {
Row(idfa, idfv, "", "", "", "", country, ip, ua, brand, model, os_version, osv_upt, upt)
} else {
Row(imei, androidId, oaid, gaid, "", "", "", "", country, ip, ua, brand, model, os_version, osv_upt, upt)
}
(platform, rw)
process(idfa, idfv, pkg_name, imei, androidId, oaid, gaid, sysId = "", bkupId = "", country, ip, ua, brand,
model, os_version, osv_upt, upt, network_type, platform)
})
rdd
}
......
......@@ -42,7 +42,7 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
if ("dsp_req".equalsIgnoreCase(business)) {
for (i <- 0 until 4) {
val df = processData(date, i, spark)
.repartition(20000)
.repartition(5000)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
val iosTab = df.filter(plf => {
......@@ -72,11 +72,25 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
.option("orc.compress", "zlib")
.orc(output + s"/android/${i}")
val otherTab = df.filter(plf => {
"other".equals(plf._1)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other/${i}"), true)
spark.createDataFrame(otherTab, otherSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + s"/other/${i}")
df.unpersist(true)
}
} else {
val df = processData(date, 0, spark)
.repartition(20000)
.repartition(5000)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
val iosTab = df.filter(plf => {
......@@ -105,6 +119,20 @@ abstract class EtlDeviceIdDaily extends CommonSparkJob with Serializable {
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + s"/android")
val otherTab = df.filter(plf => {
"other".equals(plf._1)
}).map(row => {
row._2
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output + s"/other"), true)
spark.createDataFrame(otherTab, otherSchema)
.coalesce(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output + s"/other")
}
} finally {
if (spark != null) {
......
package mobvista.dmp.utils.common
import mobvista.dmp.common.MobvistaConstant
import java.io.UnsupportedEncodingException
import java.math.BigInteger
import java.security.{MessageDigest, NoSuchAlgorithmException}
/**
* @package: mobvista.dmp.utils.common
* @author: wangjf
* @date: 2021/12/7
* @time: 4:13 下午
* @email: jinfeng.wang@mobvista.com
*/
object MD5Util {
def getMD5Long(str: String): Long = {
val id_map: Map[String, String] = Map("a" -> "1", "b" -> "2", "c" -> "3", "d" -> "4", "e" -> "5", "f" -> "6", "g" -> "7")
var strm = ""
try {
// 第一步,获取MessageDigest对象,参数为MD5表示这是一个MD5算法
val md5 = MessageDigest.getInstance("MD5")
// 第二步跳过,输入源数据,参数类型为byte[]
// 第三步,计算MD5值
val array = md5.digest(str.getBytes("UTF-8"))
// 第四步,结果转换并返回
val bigInt = new BigInteger(1, array).toString(16).substring(8, 24)
for (c <- bigInt.toCharArray) {
if (id_map.contains(String.valueOf(c))) strm += id_map(String.valueOf(c))
else strm += String.valueOf(c)
}
} catch {
case e@(_: NoSuchAlgorithmException | _: UnsupportedEncodingException) =>
e.printStackTrace()
}
strm.toLong
}
def main(args: Array[String]): Unit = {
val data =
"""
|{ "@timestamp": "07/Dec/2021:23:13:36 +0800", "@fields": { "remote_addr": "100.122.18.27", "remote_user": "-", "body_bytes_sent": "0", "status": "200", "http_host": "analytics-http-ab.mintegral.net", "request": "POST / HTTP/1.1", "request_method": "POST", "request_body": "platform=1&package_name=com.Upperpik.HairChallenge&os_version=11&brand=samsung&model=SM-T510&gaid=6b4e83cb-b21b-41e4-a187-ed1c2c56197e&network_type=9&network_str=&language=es-ES&timezone=GMT%252B01%253A00&ua=Mozilla%252F5.0%2B%2528Linux%253B%2BAndroid%2B11%253B%2BSM-T510
|""".stripMargin
println(MobvistaConstant.String2JSONObject(data))
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment