Commit 356a6eab by WangJinfeng

fix rtdmp bug,appid_package bug

parent 88bfe623
...@@ -15,8 +15,6 @@ INTPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${dat ...@@ -15,8 +15,6 @@ INTPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${dat
check_await ${INTPUT}/_SUCCESS check_await ${INTPUT}/_SUCCESS
checkRTDmp ${today}
sleep 60 sleep 60
host="ip-172-31-20-35.ec2.internal" host="ip-172-31-20-35.ec2.internal"
......
...@@ -4,12 +4,14 @@ source ../dmp_env.sh ...@@ -4,12 +4,14 @@ source ../dmp_env.sh
today=${ScheduleTime} today=${ScheduleTime}
date_time=$(date +"%Y%m%d%H" -d "-2 hour $today") date_time=$(date +"%Y%m%d%H" -d "-1 hour $today")
date_path=$(date +%Y/%m/%d/%H -d "-1 hour $today") date_path=$(date +%Y/%m/%d/%H -d "-1 hour $today")
INPUT="s3://mob-emr-test/dataplatform/rtdmp_pre/${date_path}" INPUT="s3://mob-emr-test/dataplatform/rtdmp_pre/${date_path}"
old_date_time=$(date +"%Y%m%d%H" -d "-2 hour $today")
old_date_path=$(date +%Y/%m/%d/%H -d "-2 hour $today") old_date_path=$(date +%Y/%m/%d/%H -d "-2 hour $today")
OLD_MERGE_INPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${old_date_path}" OLD_MERGE_INPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${old_date_path}"
...@@ -27,13 +29,13 @@ spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMain \ ...@@ -27,13 +29,13 @@ spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMain \
--master yarn --deploy-mode cluster \ --master yarn --deploy-mode cluster \
--executor-memory 18g --driver-memory 6g --executor-cores 5 --num-executors 40 \ --executor-memory 18g --driver-memory 6g --executor-cores 5 --num-executors 40 \
.././DMP.jar \ .././DMP.jar \
-datetime ${date_time} -input ${INPUT} -output ${OUTPUT} -coalesce 200 -datetime ${date_time} -old_datetime ${old_date_time} -input ${INPUT} -output ${OUTPUT} -coalesce 200
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
exit 255 exit 255
fi fi
mount_partition "audience_merge" "dt='${curr_time}'" "$OUTPUT" mount_partition "audience_merge" "dt='${date_time}'" "$OUTPUT"
expire_time=$(date +"%Y%m%d%H" -d "-24 hour $today") expire_time=$(date +"%Y%m%d%H" -d "-24 hour $today")
......
...@@ -70,7 +70,8 @@ public class AppidPackageDictMR extends CommonMapReduce { ...@@ -70,7 +70,8 @@ public class AppidPackageDictMR extends CommonMapReduce {
CommonMapReduce.setMetrics(context, "DMP", "column_num_error", 1); CommonMapReduce.setMetrics(context, "DMP", "column_num_error", 1);
return; return;
} }
if (array.length == 3 && !"null".equals(array[1])) { //原 appid, package_name, platform if (array.length == 3 && !"null".equals(array[1]) && appidPtn.matcher(array[0]).matches()
&& platformPtn.matcher(array[2]).matches()) { //原 appid, package_name, platform
outKey.set(array[0]); outKey.set(array[0]);
outValue.set(joiner.join("A", array[1], array[2])); outValue.set(joiner.join("A", array[1], array[2]));
context.write(outKey, outValue); context.write(outKey, outValue);
......
...@@ -3,6 +3,7 @@ package mobvista.dmp.datasource.rtdmp ...@@ -3,6 +3,7 @@ package mobvista.dmp.datasource.rtdmp
import com.alibaba.fastjson.JSONObject import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceMerge import mobvista.dmp.datasource.rtdmp.Constant.AudienceMerge
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession}
import org.codehaus.jackson.map.ObjectMapper import org.codehaus.jackson.map.ObjectMapper
...@@ -24,6 +25,7 @@ class RTDmpMain extends CommonSparkJob with Serializable { ...@@ -24,6 +25,7 @@ class RTDmpMain extends CommonSparkJob with Serializable {
def commandOptions(): Options = { def commandOptions(): Options = {
val options = new Options() val options = new Options()
options.addOption("datetime", true, "datetime") options.addOption("datetime", true, "datetime")
options.addOption("old_datetime", true, "old_datetime")
options.addOption("input", true, "input") options.addOption("input", true, "input")
options.addOption("output", true, "output") options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce") options.addOption("coalesce", true, "coalesce")
...@@ -36,12 +38,13 @@ class RTDmpMain extends CommonSparkJob with Serializable { ...@@ -36,12 +38,13 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val options = commandOptions() val options = commandOptions()
val commandLine = parser.parse(options, args) val commandLine = parser.parse(options, args)
val datetime = commandLine.getOptionValue("datetime") val datetime = commandLine.getOptionValue("datetime")
val old_datetime = commandLine.getOptionValue("old_datetime")
val input = commandLine.getOptionValue("input") val input = commandLine.getOptionValue("input")
val output = commandLine.getOptionValue("output") val output = commandLine.getOptionValue("output")
val coalesce = commandLine.getOptionValue("coalesce") val coalesce = commandLine.getOptionValue("coalesce")
val spark: SparkSession = SparkSession.builder() val spark: SparkSession = SparkSession.builder()
.appName(s"RTDmpMain") .appName(s"RTDmpMain.${datetime}")
.config("spark.rdd.compress", "true") .config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy") .config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.orc.filterPushdown", "true")
...@@ -52,7 +55,21 @@ class RTDmpMain extends CommonSparkJob with Serializable { ...@@ -52,7 +55,21 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val sc = spark.sparkContext val sc = spark.sparkContext
try { try {
val sdf = new SimpleDateFormat("yyyyMMddHH") var sdf = new SimpleDateFormat("yyyyMMddHHmmss")
// 默认计算上个小时的数据
val update_time_start = DateUtil.format(sdf.parse(datetime + "0000"), "yyyy-MM-dd HH:mm:ss")
val update_time_end = DateUtil.format(sdf.parse(datetime + "5959"), "yyyy-MM-dd HH:mm:ss")
val audience_date_utime_start = sdf.parse(datetime + "0000").getTime / 1000 - 28800
val audience_date_utime_end = sdf.parse(datetime + "5959").getTime / 1000 - 25200
val updateAudienceIds =
ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 1, 2)
.asScala.keys.toSet
println(s"updateAudienceIds -->> ${updateAudienceIds.mkString(",")}")
sdf = new SimpleDateFormat("yyyyMMddHH")
val calendar = Calendar.getInstance() val calendar = Calendar.getInstance()
val date = sdf.parse(datetime) val date = sdf.parse(datetime)
calendar.setTime(date) calendar.setTime(date)
...@@ -72,7 +89,7 @@ class RTDmpMain extends CommonSparkJob with Serializable { ...@@ -72,7 +89,7 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val sql = val sql =
s""" s"""
|SELECT * FROM dwh.audience_merge WHERE dt = '$datetime' |SELECT * FROM dwh.audience_merge WHERE dt = '$old_datetime'
|""".stripMargin |""".stripMargin
val merge_rdd = spark.sql(sql).rdd val merge_rdd = spark.sql(sql).rdd
.map(row => { .map(row => {
...@@ -95,7 +112,7 @@ class RTDmpMain extends CommonSparkJob with Serializable { ...@@ -95,7 +112,7 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val new_audience = MobvistaConstant.String2JSONObject(opt1.get._1).asInstanceOf[java.util.Map[String, String]] val new_audience = MobvistaConstant.String2JSONObject(opt1.get._1).asInstanceOf[java.util.Map[String, String]]
val old_audience = opt2.get._1 val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((k, v) => !new_audience.keySet().contains(k) && v.compareTo(expire_time) > 0) .retain((k, v) => !updateAudienceIds.contains(k.toInt) && !new_audience.keySet().contains(k) && v.compareTo(expire_time) > 0)
new_audience.putAll(retain_old_audience.asJava) new_audience.putAll(retain_old_audience.asJava)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(new_audience), datetime, opt1.get._2) AudienceMerge(devid, new ObjectMapper().writeValueAsString(new_audience), datetime, opt1.get._2)
} else if (opt1.nonEmpty && opt2.isEmpty) { } else if (opt1.nonEmpty && opt2.isEmpty) {
...@@ -103,7 +120,7 @@ class RTDmpMain extends CommonSparkJob with Serializable { ...@@ -103,7 +120,7 @@ class RTDmpMain extends CommonSparkJob with Serializable {
} else { } else {
val old_audience = opt2.get._1 val old_audience = opt2.get._1
val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala val retain_old_audience = MobvistaConstant.String2JSONObject(old_audience).asInstanceOf[java.util.Map[String, String]].asScala
.retain((_, v) => v.compareTo(expire_time) > 0) .retain((k, v) => !updateAudienceIds.contains(k.toInt) && v.compareTo(expire_time) > 0)
AudienceMerge(devid, new ObjectMapper().writeValueAsString(retain_old_audience.asJava), opt2.get._2, opt2.get._3) AudienceMerge(devid, new ObjectMapper().writeValueAsString(retain_old_audience.asJava), opt2.get._2, opt2.get._3)
} }
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment