Commit dea412dc by WangJinfeng

update id_mapping

parent 4ed7ea68
...@@ -14,9 +14,9 @@ dt_dash_one_days=$(date -d "$ScheduleTime 1 days ago" +"%Y-%m-%d") ...@@ -14,9 +14,9 @@ dt_dash_one_days=$(date -d "$ScheduleTime 1 days ago" +"%Y-%m-%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d") dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
INPUT_PATH="${REYUN_RAW_DATA}/${dt_slash_today}" INPUT_PATH="${REYUN_RAW_DATA}/${dt_slash_today}"
check_await "${INPUT_PATH}" # check_await "${INPUT_PATH}"
common_mount_partition "reyun" "pkginfo" "ds='${dt_today}'" "${INPUT_PATH}" # common_mount_partition "reyun" "pkginfo" "ds='${dt_today}'" "${INPUT_PATH}"
OUTPUT_PATH="${REYUN_DAILY_PATH}/${dt_slash_today}" OUTPUT_PATH="${REYUN_DAILY_PATH}/${dt_slash_today}"
......
type=command type=command
dependencies=id_mapping_overseas_android,id_mapping_cn_android dependencies=id_mapping_overseas_ios,id_mapping_overseas_android,id_mapping_cn_ios,id_mapping_cn_android
command=echo "id_mapping job end!" command=echo "id_mapping job end!"
\ No newline at end of file
...@@ -9,25 +9,25 @@ PLATFORM=$2 ...@@ -9,25 +9,25 @@ PLATFORM=$2
if [[ ${COUNTRY} = 'cn' ]]; then if [[ ${COUNTRY} = 'cn' ]]; then
if [[ ${PLATFORM} = 'android' ]]; then if [[ ${PLATFORM} = 'android' ]]; then
partition=1000 partition=1000
executors=50 executors=100
cores=2 cores=3
coalesce=200 coalesce=400
else else
partition=1000 partition=1000
executors=50 executors=100
cores=2 cores=3
coalesce=200 coalesce=400
fi fi
else else
if [[ ${PLATFORM} = 'android' ]]; then if [[ ${PLATFORM} = 'android' ]]; then
partition=2000 partition=5000
executors=200 executors=200
cores=2 cores=3
coalesce=2000 coalesce=2000
else else
partition=2000 partition=4000
executors=200 executors=200
cores=2 cores=3
coalesce=1000 coalesce=1000
fi fi
fi fi
...@@ -42,8 +42,6 @@ DSP_INPUT_PATH=${ID_MAPPING}/${date_path}/dsp_req ...@@ -42,8 +42,6 @@ DSP_INPUT_PATH=${ID_MAPPING}/${date_path}/dsp_req
check_await "${ADN_REQUEST_INPUT_PATH}/$PLATFORM/_SUCCESS" check_await "${ADN_REQUEST_INPUT_PATH}/$PLATFORM/_SUCCESS"
# check_await "${DSP_INPUT_PATH}/$PLATFORM/_SUCCESS"
before_date_path=$(date +'%Y/%m/%d' -d "-2 day $ScheduleTime") before_date_path=$(date +'%Y/%m/%d' -d "-2 day $ScheduleTime")
OLD_ID_MAPPING_PATH=${ADS_DEVICE_ID_MAPPING}/${before_date_path}/${COUNTRY}/${PLATFORM} OLD_ID_MAPPING_PATH=${ADS_DEVICE_ID_MAPPING}/${before_date_path}/${COUNTRY}/${PLATFORM}
...@@ -52,14 +50,16 @@ check_await "${OLD_ID_MAPPING_PATH}/mid/_SUCCESS" ...@@ -52,14 +50,16 @@ check_await "${OLD_ID_MAPPING_PATH}/mid/_SUCCESS"
OUTPUT_PATH=${ADS_DEVICE_ID_MAPPING}/${date_path}/${COUNTRY}/${PLATFORM} OUTPUT_PATH=${ADS_DEVICE_ID_MAPPING}/${date_path}/${COUNTRY}/${PLATFORM}
spark-submit --class mobvista.dmp.datasource.id_mapping.IDMappingGraphx \ spark-submit --class mobvista.dmp.datasource.id_mapping.IDMappingGraphxResult \
--name "IDMappingGraphx.${LOG_TIME}.${COUNTRY}.${PLATFORM}" \ --name "IDMappingGraphxResult.${LOG_TIME}.${COUNTRY}.${PLATFORM}" \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=${partition} \ --conf spark.sql.shuffle.partitions=${partition} \
--conf spark.default.parallelism=${partition} \ --conf spark.default.parallelism=${partition} \
--deploy-mode cluster --executor-memory 6g --driver-memory 6g --executor-cores ${cores} --num-executors ${executors} \ --conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \
--deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores ${cores} --num-executors ${executors} \
../${JAR} -date ${LOG_TIME} -country ${COUNTRY} -platform ${PLATFORM} \ ../${JAR} -date ${LOG_TIME} -country ${COUNTRY} -platform ${PLATFORM} \
-output ${OUTPUT_PATH}/mid -fre_output ${OUTPUT_PATH}/frequency -result_output ${OUTPUT_PATH}/result -coalesce ${coalesce} -output ${OUTPUT_PATH}/result -coalesce ${coalesce}
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
exit 255 exit 255
......
type=command type=command
dependencies=id_mapping_cn_ios dependencies=id_mapping_cn_android_mid
command=sh -x id_mapping.sh 'cn' 'android' command=sh -x id_mapping.sh 'cn' 'android'
\ No newline at end of file
type=command
# dependencies=id_mapping_mid_cn_ios
command=sh -x id_mapping_mid.sh 'cn' 'android'
\ No newline at end of file
type=command type=command
dependencies=id_mapping_cn_ios_mid
command=sh -x id_mapping.sh 'cn' 'ios' command=sh -x id_mapping.sh 'cn' 'ios'
\ No newline at end of file
type=command
command=sh -x id_mapping_mid.sh 'cn' 'ios'
\ No newline at end of file
#! /bin/bash
source ../dmp_env.sh
COUNTRY=$1
PLATFORM=$2
if [[ ${COUNTRY} = 'cn' ]]; then
if [[ ${PLATFORM} = 'android' ]]; then
partition=1000
executors=100
cores=3
coalesce=400
else
partition=1000
executors=100
cores=3
coalesce=400
fi
else
if [[ ${PLATFORM} = 'android' ]]; then
partition=10000
executors=256
cores=3
coalesce=3000
else
partition=4000
executors=200
cores=3
coalesce=1000
fi
fi
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
date_path=$(date +'%Y/%m/%d' -d "-1 day $ScheduleTime")
ADN_REQUEST_INPUT_PATH=${ID_MAPPING}/${date_path}/adn_request
DSP_INPUT_PATH=${ID_MAPPING}/${date_path}/dsp_req
check_await "${ADN_REQUEST_INPUT_PATH}/$PLATFORM/_SUCCESS"
# check_await "${DSP_INPUT_PATH}/$PLATFORM/_SUCCESS"
before_date_path=$(date +'%Y/%m/%d' -d "-2 day $ScheduleTime")
OLD_ID_MAPPING_PATH=${ADS_DEVICE_ID_MAPPING}/${before_date_path}/${COUNTRY}/${PLATFORM}
check_await "${OLD_ID_MAPPING_PATH}/mid/_SUCCESS"
OUTPUT_PATH=${ADS_DEVICE_ID_MAPPING}/${date_path}/${COUNTRY}/${PLATFORM}
spark-submit --class mobvista.dmp.datasource.id_mapping.IDMappingGraphx \
--name "IDMappingGraphx.${LOG_TIME}.${COUNTRY}.${PLATFORM}" \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=${partition} \
--conf spark.default.parallelism=${partition} \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \
--deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores ${cores} --num-executors ${executors} \
../${JAR} -date ${LOG_TIME} -country ${COUNTRY} -platform ${PLATFORM} \
-output ${OUTPUT_PATH}/mid -result_output ${OUTPUT_PATH}/result -coalesce ${coalesce}
if [ $? -ne 0 ]; then
exit 255
fi
type=command type=command
dependencies=id_mapping_overseas_ios dependencies=id_mapping_overseas_android_mid
command=sh -x id_mapping.sh 'overseas' 'android' command=sh -x id_mapping.sh 'overseas' 'android'
\ No newline at end of file
type=command
# dependencies=id_mapping_overseas_android
command=sh -x id_mapping_mid.sh 'overseas' 'android'
\ No newline at end of file
type=command type=command
dependencies=id_mapping_overseas_ios_mid
command=sh -x id_mapping.sh 'overseas' 'ios' command=sh -x id_mapping.sh 'overseas' 'ios'
\ No newline at end of file
type=command
command=sh -x id_mapping_mid.sh 'overseas' 'ios'
\ No newline at end of file
...@@ -125,7 +125,9 @@ object Constant { ...@@ -125,7 +125,9 @@ object Constant {
val androidIDSet = Array("gaid", "imei", "oaid", "sysid", "xwho", "user_id", "android_pkg", "bmosv_upt", "bmosv_ipua_pkg", "bkupid") val androidIDSet = Array("gaid", "imei", "oaid", "sysid", "xwho", "user_id", "android_pkg", "bmosv_upt", "bmosv_ipua_pkg", "bkupid")
val androidMainIDSet = Set("imei", "gaid", "oaid", "sysid", "xwho", "user_id") val androidCNMainIDSet = Set("imei", "oaid", "sysid", "xwho", "user_id")
val androidMainIDSet = Set("gaid", "sysid", "xwho", "user_id")
case class AdrTab(imei: String, android_id: String, pkg_name: String, oaid: String, gaid: String, sysid: String, bkupid: String, case class AdrTab(imei: String, android_id: String, pkg_name: String, oaid: String, gaid: String, sysid: String, bkupid: String,
xwho: String, user_id: String, country: String, ip: String, ua: String, brand: String, model: String, xwho: String, user_id: String, country: String, ip: String, ua: String, brand: String, model: String,
...@@ -259,11 +261,6 @@ object Constant { ...@@ -259,11 +261,6 @@ object Constant {
| GROUP BY imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt | GROUP BY imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt
|""".stripMargin |""".stripMargin
val old_id_mapping_sql: String =
"""
|
|""".stripMargin
val sss_sql = val sss_sql =
""" """
|""".stripMargin |""".stripMargin
...@@ -413,7 +410,7 @@ object Constant { ...@@ -413,7 +410,7 @@ object Constant {
case class OneIDScore(one_id: String, one_type: String, one_score: Double, one_version: String) extends Serializable case class OneIDScore(one_id: String, one_type: String, one_score: Double, one_version: String) extends Serializable
class CustomInterator(active_date: String, iter: Iterator[((String, String), Set[(String, String, Long)])], class CustomInterator(active_date: String, iter: Iterator[((String, String), Iterable[(String, String, Long)])],
idArray: Array[String], mainIDSet: Set[String]) extends Iterator[ArrayBuffer[((String, String), (String, String))]] { idArray: Array[String], mainIDSet: Set[String]) extends Iterator[ArrayBuffer[((String, String), (String, String))]] {
def hasNext: Boolean = { def hasNext: Boolean = {
iter.hasNext iter.hasNext
...@@ -435,7 +432,11 @@ object Constant { ...@@ -435,7 +432,11 @@ object Constant {
val json = new JSONObject() val json = new JSONObject()
json.put("one_type", t._2) json.put("one_type", t._2)
json.put("one_date", active_date) json.put("one_date", active_date)
json.put("one_cnt", t._3) if (oneID.containsKey(t._1) && oneID.getJSONObject(t._1).getLongValue("one_cnt") < t._3) {
json.put("one_cnt", oneID.getJSONObject(t._1).getLongValue("one_cnt") + t._3)
} else {
json.put("one_cnt", t._3)
}
oneID.put(t._1, json) oneID.put(t._1, json)
} }
finalize() finalize()
......
...@@ -9,6 +9,7 @@ import mobvista.dmp.utils.common.MD5Util.hashMD5 ...@@ -9,6 +9,7 @@ import mobvista.dmp.utils.common.MD5Util.hashMD5
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
...@@ -34,7 +35,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -34,7 +35,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
options.addOption("output", true, "output") options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce") options.addOption("coalesce", true, "coalesce")
options.addOption("result_output", true, "result_output") options.addOption("result_output", true, "result_output")
options.addOption("fre_output", true, "fre_output")
options options
} }
...@@ -47,7 +47,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -47,7 +47,6 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val date = commandLine.getOptionValue("date") val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output") val output = commandLine.getOptionValue("output")
val result_output = commandLine.getOptionValue("result_output") val result_output = commandLine.getOptionValue("result_output")
// val fre_output = commandLine.getOptionValue("fre_output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform") val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform")
...@@ -84,17 +83,18 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -84,17 +83,18 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
dailySQL = Constant.ios_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"") dailySQL = Constant.ios_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"")
} }
case "android" => { case "android" => {
idMainSet = androidMainIDSet
scoreMap = androidIDScoreMap scoreMap = androidIDScoreMap
country match { country match {
case "CN" => case "CN" =>
idMainSet = androidCNMainIDSet
schame = adrCNVertSchema schame = adrCNVertSchema
idSet = androidCNIDSet idSet = androidCNIDSet
dailySQL = Constant.android_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"AND country = '${country}'") dailySQL = Constant.android_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"AND country = '${country}'")
case _ => case _ =>
idMainSet = androidMainIDSet
schame = adrVertSchema schame = adrVertSchema
idSet = androidIDSet idSet = androidIDSet
dailySQL = Constant.android_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"AND country != '${country}'") dailySQL = Constant.android_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"")
} }
} }
case _ => case _ =>
...@@ -134,13 +134,35 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -134,13 +134,35 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
processVertex(schedule_date, row, idSet, idMainSet) processVertex(schedule_date, row, idSet, idMainSet)
}).flatMap(l => l) }).flatMap(l => l)
val maxGraph = vertex.combineByKey( vertex.persist(StorageLevel.MEMORY_AND_DISK_SER)
(v: (String, String, Long)) => Set(v),
(c: Set[(String, String, Long)], v: (String, String, Long)) => c ++ Seq(v),
(c1: Set[(String, String, Long)], c2: Set[(String, String, Long)]) => c1 ++ c2
)
maxGraph.persist(StorageLevel.MEMORY_AND_DISK_SER) val maxGraphFilter = vertex.map(l => {
(l._1, 1)
}).groupByKey().map(l => {
(l._1, l._2.size)
}).filter(l => {
l._2 > 1000
})
maxGraphFilter.cache()
val maxGraph = vertex.leftOuterJoin(maxGraphFilter)
.mapPartitions(kvs => {
kvs.map(kv => {
val key = kv._1
val value = kv._2
if (value._2.isEmpty) {
(key, value._1)
} else {
null
}
})
}).filter(line => {
line != null
}).combineByKey(
(v: (String, String, Long)) => Iterable(v),
(c: Iterable[(String, String, Long)], v: (String, String, Long)) => c ++ Seq(v),
(c1: Iterable[(String, String, Long)], c2: Iterable[(String, String, Long)]) => c1 ++ c2
)
// 非主ID生成OneID // 非主ID生成OneID
val multiOneIDRDD = maxGraph.filter(kv => { val multiOneIDRDD = maxGraph.filter(kv => {
...@@ -163,13 +185,10 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -163,13 +185,10 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
(kv._1, (oneID.toJSONString, schedule_date)) (kv._1, (oneID.toJSONString, schedule_date))
}) })
maxGraph.unpersist(true)
val yesDate = DateUtil.getDayByString(date, "yyyyMMdd", -1) val yesDate = DateUtil.getDayByString(date, "yyyyMMdd", -1)
val updateDate = sdf1.format(sdf2.parse(DateUtil.getDayByString(date, "yyyyMMdd", -7))) val updateDate = sdf1.format(sdf2.parse(DateUtil.getDayByString(date, "yyyyMMdd", -7)))
// spark.udf.register("filterAction",filterAction _)
val oldMidMergeOneIDRDD = spark.sql( val oldMidMergeOneIDRDD = spark.sql(
s""" s"""
|SELECT device_id, device_type, one_id, update_date |SELECT device_id, device_type, one_id, update_date
...@@ -177,22 +196,16 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -177,22 +196,16 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
|""".stripMargin) |""".stripMargin)
.rdd .rdd
.map(row => { .map(row => {
val update_date = if (StringUtils.isNotBlank(row.getAs[String]("update_date"))) { ((row.getAs[String]("device_id"), row.getAs[String]("device_type")), (row.getAs[String]("one_id"), row.getAs[String]("update_date")))
row.getAs[String]("update_date")
} else {
schedule_date
}
((row.getAs[String]("device_id"), row.getAs[String]("device_type")), (row.getAs[String]("one_id"), update_date))
}).filter(rs => { }).filter(rs => {
filterAction(rs._1._2, idMainSet) || (!filterAction(rs._1._2, idMainSet) && rs._2._2.compareTo(updateDate) >= 0) filterAction(rs._1._2, idMainSet) || (!filterAction(rs._1._2, idMainSet) && rs._2._2.compareTo(updateDate) >= 0)
}) })
val midMergeOneIDRDD = spark.sparkContext.union(Seq(oldMidMergeOneIDRDD, singleOneIDRDD, multiOneIDRDD)) val midMergeOneIDRDD = spark.sparkContext.union(Seq(singleOneIDRDD, multiOneIDRDD, oldMidMergeOneIDRDD))
.coalesce(coalesce * 5)
.combineByKey( .combineByKey(
(v: (String, String)) => Set(v), (v: (String, String)) => Iterable(v),
(c: Set[(String, String)], v: (String, String)) => c ++ Seq(v), (c: Iterable[(String, String)], v: (String, String)) => c ++ Seq(v),
(c1: Set[(String, String)], c2: Set[(String, String)]) => c1 ++ c2 (c1: Iterable[(String, String)], c2: Iterable[(String, String)]) => c1 ++ c2
).map(kv => { ).map(kv => {
val srcId = kv._1._1 val srcId = kv._1._1
val srcType = kv._1._2 val srcType = kv._1._2
...@@ -216,12 +229,10 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -216,12 +229,10 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}) })
import spark.implicits._ import spark.implicits._
midMergeOneIDRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
midMergeOneIDRDD.toDF midMergeOneIDRDD.toDF
.repartition(coalesce) .coalesce(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.orc(outPutPath) .orc(outPutPath)
...@@ -256,13 +267,14 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -256,13 +267,14 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}) })
val json = new JSONObject() val json = new JSONObject()
json.put("one_id", oneIDScore.one_id) json.put("one_id", oneIDScore.one_id)
json.put("one_type", oneIDScore.one_type) json.put("type", oneIDScore.one_type)
json.put("one_score", oneIDScore.one_score) json.put("score", oneIDScore.one_score)
json.put("one_version", oneIDScore.one_version) json.put("version", oneIDScore.one_version)
Result(device_id, device_type, json.toJSONString, update_date) Result(device_id, device_type, json.toJSONString, update_date)
}) })
}) })
/*
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(resultOutPutPath), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(resultOutPutPath), true)
resultOneID resultOneID
...@@ -277,8 +289,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -277,8 +289,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
|ALTER TABLE ads.ads_device_id_mapping ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}',platform='$platform',`type`='result') |ALTER TABLE ads.ads_device_id_mapping ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}',platform='$platform',`type`='result')
| LOCATION '$resultOutPutPath' | LOCATION '$resultOutPutPath'
|""".stripMargin) |""".stripMargin)
*/
midMergeOneIDRDD.unpersist(true)
} }
def filterAction(device_type: String, mainIDSet: Set[String]): Boolean = { def filterAction(device_type: String, mainIDSet: Set[String]): Boolean = {
......
package mobvista.dmp.datasource.id_mapping
import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.MobvistaConstant.{sdf1, sdf2}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI
import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
/**
* @package: mobvista.dmp.datasource.id_mapping
* @author: wangjf
* @date: 2021/12/7
* @time: 2:39 下午
* @email: jinfeng.wang@mobvista.com
*/
class IDMappingGraphxResult extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("country", true, "country")
options.addOption("platform", true, "platform")
options.addOption("date", true, "date")
options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val country = commandLine.getOptionValue("country")
val platform = commandLine.getOptionValue("platform")
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphxResult.$date.$country.$platform")
try {
oldAndTodayIdMapping(country.toUpperCase, platform, date, spark, output, coalesce)
} finally {
if (spark != null) {
spark.stop()
}
}
0
}
def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String,
coalesce: Int) = {
val dailySQL =
s"""
|SELECT * FROM ads.ads_device_id_mapping WHERE dt = '$date' AND source = '${country.toLowerCase}' AND platform = '$platform' AND `type` = 'mid'
|""".stripMargin
var idSet: Array[String] = null
var scoreMap: Map[String, Double] = null
platform match {
case "ios" =>
idSet = iosIDSet
scoreMap = iosIDScoreMap
case "android" => {
scoreMap = androidIDScoreMap
country match {
case "CN" =>
idSet = androidCNIDSet
case _ =>
idSet = androidIDSet
}
}
case _ =>
""
}
val schedule_date = sdf1.format(sdf2.parse(date))
val resultOneID = spark.sql(dailySQL).rdd.mapPartitions(rs => {
rs.map(r => {
val device_id = r.getAs[String]("device_id")
val device_type = r.getAs[String]("device_type")
val one_id = MobvistaConstant.String2JSONObject(r.getAs[String]("one_id"))
val update_date = r.getAs[String]("update_date")
val keys = one_id.keySet().asScala
var oneIDScore: OneIDScore = OneIDScore("", "", 0, "")
keys.foreach(key => {
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val json = one_id.getJSONObject(key)
val id_type = json.getString("one_type")
val id_type_score = scoreMap(id_type)
val active_date = json.getString("one_date")
val cnt = json.getLongValue("one_cnt")
val days = (sdf.parse(schedule_date).getTime - sdf.parse(active_date).getTime) / 1000 / 3600 / 24 + 1
val score = id_type_score * 30 / days + 0.1 * cnt
if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1
|| (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) {
oneIDScore = OneIDScore(key, id_type, score, active_date)
}
})
val json = new JSONObject()
json.put("one_id", oneIDScore.one_id)
json.put("type", oneIDScore.one_type)
json.put("score", oneIDScore.one_score)
json.put("version", oneIDScore.one_version)
Result(device_id, device_type, json.toJSONString, update_date)
})
})
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
import spark.implicits._
resultOneID
.toDF
.repartition(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outPutPath)
spark.sql(
s"""
|ALTER TABLE ads.ads_device_id_mapping ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}',platform='$platform',`type`='result')
| LOCATION '$outPutPath'
|""".stripMargin)
}
}
object IDMappingGraphxResult {
def main(args: Array[String]): Unit = {
new IDMappingGraphxResult().run(args)
}
}
\ No newline at end of file
...@@ -27,19 +27,17 @@ class ReYun extends EtlDeviceIdDaily { ...@@ -27,19 +27,17 @@ class ReYun extends EtlDeviceIdDaily {
val commonInput = "s3://mob-emr-test/reyun/dmp/onedata/dwd/dwd_device_ids_inc_daily" val commonInput = "s3://mob-emr-test/reyun/dmp/onedata/dwd/dwd_device_ids_inc_daily"
val tkioEventIosRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_event/ios") val tkioEventIosRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_event/ios")
val tkioClickIosRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_click/ios")
val abtestIosRDD = spark.read.orc(s"$commonInput/$year/$month/$day/abtest/ios") val abtestIosRDD = spark.read.orc(s"$commonInput/$year/$month/$day/abtest/ios")
val tkioEventAdrRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_event/android") val tkioEventAdrRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_event/android")
val tkioClickAdrRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_click/android")
val abtestAdrRDD = spark.read.orc(s"$commonInput/$year/$month/$day/abtest/android") val abtestAdrRDD = spark.read.orc(s"$commonInput/$year/$month/$day/abtest/android")
val iosDF = tkioEventIosRDD.union(tkioClickIosRDD).union(abtestIosRDD) val iosDF = tkioEventIosRDD.union(abtestIosRDD)
.withColumn("cnt", lit(1L)) .withColumn("cnt", lit(1L))
.rdd.map(row => { .rdd.map(row => {
("ios", parseIosRow(row)) ("ios", parseIosRow(row))
}) })
val adrDF = tkioEventAdrRDD.union(tkioClickAdrRDD).union(abtestAdrRDD) val adrDF = tkioEventAdrRDD.union(abtestAdrRDD)
.withColumn("cnt", lit(1L)) .withColumn("cnt", lit(1L))
.rdd.map(row => { .rdd.map(row => {
("android", parseAdrRow(row)) ("android", parseAdrRow(row))
...@@ -53,8 +51,8 @@ class ReYun extends EtlDeviceIdDaily { ...@@ -53,8 +51,8 @@ class ReYun extends EtlDeviceIdDaily {
val pkg_name = row.getAs[String]("pkg_name") val pkg_name = row.getAs[String]("pkg_name")
val sysId = "" val sysId = ""
val bkupIp = "" val bkupIp = ""
val xwho = row.getAs[String]("xwho") val xwho = ""
val user_id = row.getAs[String]("user_id") val user_id = ""
val country = "CN" val country = "CN"
val ip = row.getAs[String]("ip") val ip = row.getAs[String]("ip")
val ua = row.getAs[String]("ua") val ua = row.getAs[String]("ua")
...@@ -117,8 +115,8 @@ class ReYun extends EtlDeviceIdDaily { ...@@ -117,8 +115,8 @@ class ReYun extends EtlDeviceIdDaily {
val pkg_name = row.getAs[String]("pkg_name") val pkg_name = row.getAs[String]("pkg_name")
val sysId = "" val sysId = ""
val bkupIp = "" val bkupIp = ""
val xwho = row.getAs[String]("xwho") val xwho = ""
val user_id = row.getAs[String]("user_id") val user_id = ""
val country = "CN" val country = "CN"
val ip = row.getAs[String]("ip") val ip = row.getAs[String]("ip")
val ua = row.getAs[String]("ua") val ua = row.getAs[String]("ua")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment