Commit 4ed7ea68 by WangJinfeng

update id_mapping add reyun

parent db70af1d
......@@ -9,26 +9,26 @@ PLATFORM=$2
if [[ ${COUNTRY} = 'cn' ]]; then
if [[ ${PLATFORM} = 'android' ]]; then
partition=1000
executors=100
cores=3
coalesce=500
executors=50
cores=2
coalesce=200
else
partition=1000
executors=100
cores=3
coalesce=500
executors=50
cores=2
coalesce=200
fi
else
if [[ ${PLATFORM} = 'android' ]]; then
partition=10000
executors=300
cores=3
coalesce=3000
partition=2000
executors=200
cores=2
coalesce=2000
else
partition=2000
executors=100
cores=3
coalesce=500
executors=200
cores=2
coalesce=1000
fi
fi
......@@ -54,11 +54,10 @@ OUTPUT_PATH=${ADS_DEVICE_ID_MAPPING}/${date_path}/${COUNTRY}/${PLATFORM}
spark-submit --class mobvista.dmp.datasource.id_mapping.IDMappingGraphx \
--name "IDMappingGraphx.${LOG_TIME}.${COUNTRY}.${PLATFORM}" \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=${partition} \
--conf spark.default.parallelism=${partition} \
--deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores ${cores} --num-executors ${executors} \
--deploy-mode cluster --executor-memory 6g --driver-memory 6g --executor-cores ${cores} --num-executors ${executors} \
../${JAR} -date ${LOG_TIME} -country ${COUNTRY} -platform ${PLATFORM} \
-output ${OUTPUT_PATH}/mid -fre_output ${OUTPUT_PATH}/frequency -result_output ${OUTPUT_PATH}/result -coalesce ${coalesce}
......
......@@ -32,10 +32,9 @@ spark-submit --class mobvista.dmp.datasource.dm.DmDeviceTagStatistics \
--conf spark.sql.shuffle.partitions=10000 \
--conf spark.default.parallelism=2000 \
--conf spark.sql.files.maxPartitionBytes=268435456 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
--deploy-mode cluster --executor-memory 18g --driver-memory 4g --executor-cores 5 --num-executors 60 \
--deploy-mode cluster --executor-memory 6g --driver-memory 4g --executor-cores 2 --num-executors 200 \
../${JAR} \
-output ${mount_path} -date ${date} -coalesce 500
......
......@@ -50,18 +50,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
// val fre_output = commandLine.getOptionValue("fre_output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = SparkSession
.builder()
.appName(s"IDMappingGraphx.$date.$country.$platform")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "lz4")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrationRequired", "false")
.config("spark.kryo.registrator", "mobvista.dmp.datasource.id_mapping.MyRegisterKryo")
.enableHiveSupport()
.getOrCreate()
val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform")
try {
oldAndTodayIdMapping(country.toUpperCase, platform, date, spark, output, result_output, coalesce)
......@@ -158,7 +147,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
kv._2.size > 1
}).mapPartitions(rs => new CustomInterator(schedule_date, rs, idSet, idMainSet))
.flatMap(l => l)
// .map(rs => updateOneID(schedule_date, rs, idSet, idMainSet)).flatMap(l => l)
// 主ID生成OneID
val singleOneIDRDD = maxGraph.filter(kv => {
......@@ -175,9 +163,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
(kv._1, (oneID.toJSONString, schedule_date))
})
// multiOneIDRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
// singleOneIDRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
maxGraph.unpersist(true)
val yesDate = DateUtil.getDayByString(date, "yyyyMMdd", -1)
......@@ -201,7 +187,8 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
filterAction(rs._1._2, idMainSet) || (!filterAction(rs._1._2, idMainSet) && rs._2._2.compareTo(updateDate) >= 0)
})
val midMergeOneIDRDD = spark.sparkContext.union(Seq(multiOneIDRDD, singleOneIDRDD, oldMidMergeOneIDRDD))
val midMergeOneIDRDD = spark.sparkContext.union(Seq(oldMidMergeOneIDRDD, singleOneIDRDD, multiOneIDRDD))
.coalesce(coalesce * 5)
.combineByKey(
(v: (String, String)) => Set(v),
(c: Set[(String, String)], v: (String, String)) => c ++ Seq(v),
......@@ -229,7 +216,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
})
import spark.implicits._
// midMergeOneIDRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
midMergeOneIDRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
......
package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.common.MobvistaConstant.{allZero, andriodIdAll, andriodIdPtn, didPtn, imeiPtn, imeiPtnAll, ipPtn, md5Ptn}
import mobvista.dmp.datasource.id_mapping.Constant.{getDevId, parseUA, process}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
......@@ -36,15 +37,152 @@ class ReYun extends EtlDeviceIdDaily {
val iosDF = tkioEventIosRDD.union(tkioClickIosRDD).union(abtestIosRDD)
.withColumn("cnt", lit(1L))
.rdd.map(row => {
("ios", row)
("ios", parseIosRow(row))
})
val adrDF = tkioEventAdrRDD.union(tkioClickAdrRDD).union(abtestAdrRDD)
.withColumn("cnt", lit(1L))
.rdd.map(row => {
("android", row)
("android", parseAdrRow(row))
})
iosDF.union(adrDF)
}
def parseIosRow(row: Row): Row = {
val idfa = row.getAs[String]("idfa")
val idfv = row.getAs[String]("idfv")
val pkg_name = row.getAs[String]("pkg_name")
val sysId = ""
val bkupIp = ""
val xwho = row.getAs[String]("xwho")
val user_id = row.getAs[String]("user_id")
val country = "CN"
val ip = row.getAs[String]("ip")
val ua = row.getAs[String]("ua")
val brand = row.getAs[String]("brand")
val model = row.getAs[String]("model")
val os_version = row.getAs[String]("os_version")
val osv_upt = "0"
val upt = "0"
val network_type = row.getAs[String]("network_type")
val cnt = row.getAs[Long]("cnt")
val f_idfa = if (StringUtils.isNotBlank(idfa) && (idfa.matches(didPtn) && !idfa.matches(allZero) || idfa.matches(md5Ptn))) {
idfa
} else {
""
}
val f_idfv = if (StringUtils.isNotBlank(idfv) && (idfv.matches(didPtn) && !idfv.matches(allZero) || idfv.matches(md5Ptn))) {
idfv
} else {
""
}
val f_ip = if (StringUtils.isNotBlank(ip) && ip.matches(ipPtn)) {
ip
} else {
""
}
val f_ua = if (StringUtils.isNotBlank(ua)) {
ua.trim
} else {
""
}
val f_brand = if (StringUtils.isNotBlank(brand)) {
brand.trim
} else {
""
}
val f_model = if (StringUtils.isNotBlank(model)) {
model.trim
} else {
""
}
val f_osv = if (StringUtils.isNotBlank(os_version)) {
os_version.trim
} else {
""
}
val f_network_type = if (StringUtils.isNotBlank(network_type) && !network_type.equalsIgnoreCase("unknown")) {
network_type.trim
} else {
""
}
Row(f_idfa, f_idfv, pkg_name, sysId, bkupIp, xwho, user_id, country, f_ip, f_ua, f_brand, f_model, f_osv, osv_upt, upt, "", f_network_type, cnt)
}
def parseAdrRow(row: Row): Row = {
val gaid = row.getAs[String]("gaid")
val oaid = row.getAs[String]("oaid")
val imei = row.getAs[String]("imei")
val androidId = row.getAs[String]("android_id")
val pkg_name = row.getAs[String]("pkg_name")
val sysId = ""
val bkupIp = ""
val xwho = row.getAs[String]("xwho")
val user_id = row.getAs[String]("user_id")
val country = "CN"
val ip = row.getAs[String]("ip")
val ua = row.getAs[String]("ua")
val brand = row.getAs[String]("brand")
val model = row.getAs[String]("model")
val os_version = row.getAs[String]("os_version")
val osv_upt = "0"
val upt = "0"
val network_type = row.getAs[String]("network_type")
val cnt = row.getAs[Long]("cnt")
val f_imei = if (StringUtils.isNotBlank(imei) && (imei.matches(imeiPtn) && !imei.matches(imeiPtnAll) || imei.matches(md5Ptn))) {
imei
} else {
""
}
val f_androidId = if (StringUtils.isNotBlank(androidId) && (androidId.matches(andriodIdPtn) && !androidId.matches(andriodIdAll) || androidId.matches(md5Ptn))) {
androidId
} else {
""
}
val f_oaid = if (StringUtils.isNotBlank(oaid) && (oaid.length >= 16 && oaid.length <= 64 && !oaid.matches(allZero) || oaid.matches(md5Ptn))) {
oaid
} else {
""
}
val f_gaid = if (StringUtils.isNotBlank(gaid) && (gaid.matches(didPtn) && !gaid.matches(allZero) || gaid.matches(md5Ptn))) {
gaid
} else {
""
}
val f_ip = if (StringUtils.isNotBlank(ip) && ip.matches(ipPtn)) {
ip
} else {
""
}
val f_ua = if (StringUtils.isNotBlank(ua)) {
ua.trim
} else {
""
}
val f_brand = if (StringUtils.isNotBlank(brand)) {
brand.trim
} else {
""
}
val f_model = if (StringUtils.isNotBlank(model)) {
model.trim
} else {
""
}
val f_osv = if (StringUtils.isNotBlank(os_version)) {
os_version.trim
} else {
""
}
val f_network_type = if (StringUtils.isNotBlank(network_type) && !network_type.equalsIgnoreCase("unknown")) {
network_type.trim
} else {
""
}
Row(f_imei, f_androidId, pkg_name, f_oaid, f_gaid, sysId, bkupIp, xwho, user_id, country, f_ip, f_ua, f_brand, f_model, f_osv, osv_upt, upt, "", f_network_type, cnt)
}
}
object ReYun {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment