Commit 10fa0aad by WangJinfeng

update id_mapping add reyun

parent 49d86c92
......@@ -42,14 +42,12 @@ check_await ${TP_GENDER_PATH}/_SUCCESS
spark-submit --class mobvista.dmp.datasource.age_gender.MergeInstallGender \
--name "MergeInstallGender.${LOG_TIME}" \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.default.parallelism=2000 \
--conf spark.storage.memoryFraction=0.4 \
--conf spark.shuffle.memoryFraction=0.4 \
--conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 5 --num-executors 60 \
--deploy-mode cluster --executor-memory 8g --driver-memory 8g --executor-cores 3 --num-executors 100 \
../${JAR} -date ${LOG_TIME} \
-ga_gender_path ${GA_GENDER_PATH} -dsp_gender_path ${DSP_GENDER_PATH} -fb_gender_path ${FB_GENDER_PATH} -tp_gender_path ${TP_GENDER_PATH} -gender_output ${GENDER_OUTPUT} -parallelism 2000
......
type=command
dependencies=reyun_id_mapping,adn_id_mapping,adn_request_id_mapping
command=echo "id_mapping job end!"
\ No newline at end of file
type=command
command=sh -x reyun_id_mapping.sh
\ No newline at end of file
#! /bin/bash
source ../dmp_env.sh
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
date_path=$(date +'%Y/%m/%d' -d "-1 day $ScheduleTime")
# INPUT_PATH=${ETL_ADN_ORG_REQ_HOURS}/${date_path}
INPUT_PATH="s3://mob-emr-test/reyun/dmp/onedata/dwd/dwd_device_ids_inc_daily/${date_path}"
check_await "${INPUT_PATH}/tkio_event/ios/_SUCCESS"
check_await "${INPUT_PATH}/tkio_click/ios/_SUCCESS"
check_await "${INPUT_PATH}/abtest/ios/_SUCCESS"
check_await "${INPUT_PATH}/tkio_event/android/_SUCCESS"
check_await "${INPUT_PATH}/tkio_click/android/_SUCCESS"
check_await "${INPUT_PATH}/abtest/android/_SUCCESS"
BUSINESS="reyun"
OUTPUT_PATH=${ID_MAPPING}/${date_path}/${BUSINESS}
spark-submit --class mobvista.dmp.datasource.id_mapping.ReYun \
--name "EtlDeviceIdDaily.$BUSINESS.$LOG_TIME" \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=5000 \
--deploy-mode cluster --executor-memory 12g --driver-memory 8g --executor-cores 5 --num-executors 100 \
../${JAR} -date ${LOG_TIME} -business ${BUSINESS} -output ${OUTPUT_PATH} -coalesce 1000
if [ $? -ne 0 ]; then
exit 255
fi
\ No newline at end of file
......@@ -247,14 +247,16 @@ object Constant {
val ios_id_mapping_sql_v2: String =
"""
|SELECT idfa, idfv, pkg_name, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, cnt
|SELECT idfa, idfv, pkg_name, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, SUM(cnt) cnt
| FROM dws.dws_device_id_ios_frequency WHERE dt = '@date' @filter_country
| GROUP BY idfa, idfv, pkg_name, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt
|""".stripMargin
val android_id_mapping_sql_v2: String =
"""
|SELECT imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, cnt
|SELECT imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, SUM(cnt) cnt
| FROM dws.dws_device_id_android_frequency WHERE dt = '@date' @filter_country
| GROUP BY imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt
|""".stripMargin
val old_id_mapping_sql: String =
......
package mobvista.dmp.datasource.id_mapping
import mobvista.dmp.datasource.id_mapping.Constant.{getDevId, parseUA, process}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{Row, SparkSession}
/**
* @package: mobvista.dmp.datasource.id_mapping
* @author: wangjf
* @date: 2021/11/30
* @time: 7:51 下午
* @email: jinfeng.wang@mobvista.com
*/
class ReYun extends EtlDeviceIdDaily {
override def processData(date: String, i: Int, spark: SparkSession): RDD[(String, Row)] = {
spark.udf.register("getDevId", getDevId _)
spark.udf.register("parseUA", parseUA _)
val year = date.substring(0, 4)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
val commonInput = "s3://mob-emr-test/reyun/dmp/onedata/dwd/dwd_device_ids_inc_daily"
val tkioEventIosRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_event/ios")
val tkioClickIosRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_click/ios")
val abtestIosRDD = spark.read.orc(s"$commonInput/$year/$month/$day/abtest/ios")
val tkioEventAdrRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_event/android")
val tkioClickAdrRDD = spark.read.orc(s"$commonInput/$year/$month/$day/tkio_click/android")
val abtestAdrRDD = spark.read.orc(s"$commonInput/$year/$month/$day/abtest/android")
val iosDF = tkioEventIosRDD.union(tkioClickIosRDD).union(abtestIosRDD)
.withColumn("cnt", lit(1L))
.rdd.map(row => {
("ios", row)
})
val adrDF = tkioEventAdrRDD.union(tkioClickAdrRDD).union(abtestAdrRDD)
.withColumn("cnt", lit(1L))
.rdd.map(row => {
("android", row)
})
iosDF.union(adrDF)
}
}
object ReYun {
def main(args: Array[String]): Unit = {
new ReYun().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment