Commit 7dddf739 by WangJinfeng

add ods_dmp_user_info_daily_mapping

parent 29fa8b67
type=command
command=sh -x device_mapping.sh
\ No newline at end of file
#!/bin/sh
source ../dmp_env.sh
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
yes_bef1_slack=$(date +%Y/%m/%d -d "-1 day $ScheduleTime")
yes_bef2_slack=$(date +%Y/%m/%d -d "-2 day $ScheduleTime")
check_await ${DMP_INSTALL_LIST}/${yes_bef1_slack}/adn_request_sdk/_SUCCESS
check_await ${DMP_INSTALL_LIST}/${yes_bef1_slack}/reyun/_SUCCESS
check_await ${DMP_INSTALL_LIST}/${yes_bef1_slack}/dsp_req/_SUCCESS
check_await ${DMP_INSTALL_LIST}/${yes_bef2_slack}/other/_SUCCESS
check_await ${DMP_INSTALL_LIST}/${yes_bef1_slack}/adn_install/_SUCCESS
OUTPUT_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/device_mapping/${yes_bef1_slack}"
spark-submit --class mobvista.dmp.datasource.device.DeviceIDMappingLower \
--name "OdsDmpUserInfoDailyMapping.${date}.wangjf" \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=5000 \
--conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--master yarn \
--deploy-mode cluster \
--executor-memory 10G \
--driver-memory 6G \
--executor-cores 4 \
--num-executors 50 \
../${JAR} \
-date ${LOG_TIME} -output ${OUTPUT_PATH} -coalesce 400
if [[ $? -ne 0 ]]; then
exit 255
fi
type=command
dependencies=ods_dmp_user_info_daily,device_mapping
command=sh -x ods_dmp_user_info_daily_mapping.sh
\ No newline at end of file
#!/bin/sh
source ../dmp_env.sh
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
yes_bef1_slack=$(date +%Y/%m/%d -d "-1 day $ScheduleTime")
INPUT_PATH=""
OUTPUT_PATH="${ODS_DMP_USER_INFO_DAILY}_mapping/${yes_bef1_slack}"
spark-submit --class mobvista.dmp.datasource.device.OdsDmpUserInfoDailyMapping \
--name "OdsDmpUserInfoDailyMapping.${date}.wangjf" \
--conf spark.network.timeout=720s \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=5000 \
--conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--master yarn \
--deploy-mode cluster \
--executor-memory 10G \
--driver-memory 6G \
--executor-cores 4 \
--num-executors 100 \
../${JAR} \
-date ${LOG_TIME} -input ${INPUT_PATH} -output ${OUTPUT_PATH} -coalesce 2000
if [[ $? -ne 0 ]]; then
exit 255
fi
mount_partition "ods_dmp_user_info_daily_mapping" "dt='${LOG_TIME}'" "${OUTPUT_PATH}"
type=command type=command
dependencies=ods_dmp_user_info_all dependencies=ods_dmp_user_info_all,ods_dmp_user_info_daily_mapping
command=echo "ods_dmp_user_info success!!!" command=echo "ods_dmp_user_info success!!!"
\ No newline at end of file
package mobvista.dmp.datasource.device
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
class DeviceIDMappingLower extends CommonSparkJob with Serializable {
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce")
val spark = MobvistaConstant.createSparkSession(s"DeviceIDMappingLower.${date}")
try {
val sc = spark.sparkContext
val updateDate = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(date))
val lastDate = DateUtil.getDayByString(date, "yyyy-MM-dd", -1)
val lastUpdateDate = DateUtil.getDayByString(updateDate, "yyyy-MM-dd", -1)
val sql =
s"""
|SELECT device_id, LOWER(device_id) lower_device_id
| FROM (
| SELECT device_id
| FROM dwh.dmp_install_list WHERE dt = '${date}' AND update_date = '${updateDate}'
| AND business IN ('adn_request_sdk','reyun','dsp_req','adn_install') AND platform = 'android' AND device_type = 'oaid'
| UNION ALL
| SELECT device_id
| FROM dwh.dmp_install_list WHERE dt = '${lastDate}' AND update_date = '${lastUpdateDate}'
| AND business = 'other' AND platform = 'android' AND device_type = 'oaid'
| ) t
| GROUP BY device_id
""".stripMargin
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
spark.sql(sql)
.repartition(coalesce.toInt)
.write
.option("orc.compress", "zlib")
.orc(output)
} finally {
if (spark != null) {
spark.stop()
}
}
0
}
def commandOptions(): Options = {
val options = new Options()
options.addOption("date", true, "date")
options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce")
options
}
}
object DeviceIDMappingLower {
def main(args: Array[String]): Unit = {
new DeviceIDMappingLower().run(args)
}
}
\ No newline at end of file
package mobvista.dmp.datasource.device
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions
import java.net.URI
import java.text.SimpleDateFormat
class OdsDmpUserInfoDailyMapping extends CommonSparkJob with Serializable {
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val input = commandLine.getOptionValue("input")
val coalesce = commandLine.getOptionValue("coalesce")
val output = commandLine.getOptionValue("output")
val sdf1 = new SimpleDateFormat("yyyy-MM-dd");
val sdf2 = new SimpleDateFormat("yyyyMMdd");
val spark = MobvistaConstant.createSparkSession(s"OdsDmpUserInfoDailyMapping.${date}")
try {
val sc = spark.sparkContext
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
val mappingDF = spark.read.orc(input)
val sql =
s"""
|SELECT * FROM dwh.ods_dmp_user_info_daily
| WHERE dt = '${date}' AND UPPER(country) = 'CN'
|""".stripMargin
val userInfoDailyDF = spark.sql(sql)
mappingDF.join(userInfoDailyDF, mappingDF("lower_device_id") === userInfoDailyDF("dev_id"), "right")
.select(
functions.coalesce(mappingDF("device_id"), userInfoDailyDF("dev_id")).alias("dev_id"),
functions.md5(functions.coalesce(mappingDF("device_id"), userInfoDailyDF("dev_id"))).alias("dev_id_md5"),
userInfoDailyDF("dev_type"),
userInfoDailyDF("platform"),
userInfoDailyDF("install"),
userInfoDailyDF("interest"),
userInfoDailyDF("model"),
userInfoDailyDF("country"),
userInfoDailyDF("osversion"),
userInfoDailyDF("age"),
userInfoDailyDF("gender"),
userInfoDailyDF("behavior")
).repartition(coalesce.toInt)
.write
.option("orc.compress", "zlib")
.orc(output)
} finally {
if (spark != null) {
spark.stop()
}
}
0
}
def commandOptions(): Options = {
val options = new Options()
options.addOption("date", true, "date")
options.addOption("input", true, "input")
options.addOption("coalesce", true, "coalesce")
options.addOption("output", true, "output")
options
}
}
object OdsDmpUserInfoDailyMapping {
def main(args: Array[String]): Unit = {
new OdsDmpUserInfoDailyMapping().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment