Commit cc706c53 by WangJinfeng

update rtdmp logic,add device_type

parent afb611c4
......@@ -64,7 +64,7 @@ spark-submit --class mobvista.dmp.datasource.backflow.BackFlow \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \
--master yarn --deploy-mode cluster \
--executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 4 \
--executor-memory 4g --driver-memory 4g --executor-cores 3 --num-executors 4 \
../.././DMP.jar \
-keyspace ${keyspace} -table ${table} -region ${region} -output ${output} -system ${system} \
-writetime_start ${writetime_start} -writetime_end ${writetime_end} -value_column ${value_column}
......
......@@ -4,33 +4,33 @@ source ../dmp_env.sh
today=${ScheduleTime:-$1}
old_time=$(date +"%Y%m%d%H" -d "-7 hour $today")
old_time=$(date +"%Y%m%d%H" -d "-2 hour $today")
curr_time=$(date +"%Y%m%d%H" -d "-1 hour $today")
old_date_path=$(date +%Y/%m/%d/%H -d "-7 hour $today")
echo ${old_date_path}
old_date_path=$(date +%Y/%m/%d/%H -d "-2 hour $today")
date_path=$(date +%Y/%m/%d/%H -d "-1 hour $today")
BASE_PATH="s3://mob-emr-test/dataplatform/rtdmp_deal"
HOUR_1_DATE=$(date +%Y/%m/%d/%H -d "-1 hour $today")
HOUR_2_DATE=$(date +%Y/%m/%d/%H -d "-2 hour $today")
HOUR_3_DATE=$(date +%Y/%m/%d/%H -d "-3 hour $today")
HOUR_4_DATE=$(date +%Y/%m/%d/%H -d "-4 hour $today")
HOUR_5_DATE=$(date +%Y/%m/%d/%H -d "-5 hour $today")
HOUR_6_DATE=$(date +%Y/%m/%d/%H -d "-6 hour $today")
# HOUR_2_DATE=$(date +%Y/%m/%d/%H -d "-2 hour $today")
# HOUR_3_DATE=$(date +%Y/%m/%d/%H -d "-3 hour $today")
# HOUR_4_DATE=$(date +%Y/%m/%d/%H -d "-4 hour $today")
# HOUR_5_DATE=$(date +%Y/%m/%d/%H -d "-5 hour $today")
# HOUR_6_DATE=$(date +%Y/%m/%d/%H -d "-6 hour $today")
# INPUT="${BASE_PATH}/${HOUR_1_DATE},${BASE_PATH}/${HOUR_2_DATE},${BASE_PATH}/${HOUR_3_DATE},${BASE_PATH}/${HOUR_4_DATE},${BASE_PATH}/${HOUR_5_DATE},${BASE_PATH}/${HOUR_6_DATE}"
INPUT="${BASE_PATH}/${HOUR_1_DATE},${BASE_PATH}/${HOUR_2_DATE},${BASE_PATH}/${HOUR_3_DATE},${BASE_PATH}/${HOUR_4_DATE},${BASE_PATH}/${HOUR_5_DATE},${BASE_PATH}/${HOUR_6_DATE}"
INPUT="${BASE_PATH}/${HOUR_1_DATE}"
check_await ${BASE_PATH}/${HOUR_1_DATE}/_SUCCESS
check_await ${BASE_PATH}/${HOUR_2_DATE}/_SUCCESS
check_await ${BASE_PATH}/${HOUR_3_DATE}/_SUCCESS
check_await ${BASE_PATH}/${HOUR_4_DATE}/_SUCCESS
check_await ${BASE_PATH}/${HOUR_5_DATE}/_SUCCESS
check_await ${BASE_PATH}/${HOUR_6_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_2_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_3_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_4_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_5_DATE}/_SUCCESS
# check_await ${BASE_PATH}/${HOUR_6_DATE}/_SUCCESS
MERGE_INPUT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/audience_merge/${old_date_path}"
......
......@@ -119,7 +119,7 @@ if [[ $? -ne 0 ]]; then
exit 255
fi
expire_date_path=$(date +%Y/%m/%d -d "-15 day $today")
expire_date_path=$(date +%Y/%m/%d -d "-90 day $today")
EXPIRE_OUTPUT_PATH="s3://mob-emr-test/dataplatform/rtdmp_request/${expire_date_path}"
if hadoop fs -ls "$EXPIRE_OUTPUT_PATH" >/dev/null 2>&1; then
hadoop dfs -rm -r ${EXPIRE_OUTPUT_PATH}
......
......@@ -79,6 +79,8 @@ public class ServerMain {
try {
httpPost.setConfig(requestConfig);
httpPost.setEntity(new StringEntity(requestBody.toJSONString()));
client.execute(httpPost);
/*
response = client.execute(httpPost);
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
......@@ -91,6 +93,8 @@ public class ServerMain {
if (jsonObject.getInteger("code") == 200 && jsonObject.containsKey("data")) {
logger.info("Audience data fetch success!");
}
*/
logger.info("Audience data fetch success!");
} catch (IOException e) {
logger.info(e.getMessage());
} finally {
......
......@@ -14,11 +14,11 @@ object Constant {
case class Device(device_id: String)
case class AudienceInfoPre(devid: String, audience_ids: String)
case class AudienceInfoPre(devid: String, audience_ids: String, device_type: String)
case class AudienceInfo(devid: String, audience_data: String, old_audience_data: String)
case class NewAudienceInfo(devid: String, update_time: String, audience_data: String)
case class NewAudienceInfo(devid: String, update_time: String, audience_data: String, device_type: String)
case class AudienceRegion(audience_info: String, region: mutable.WrappedArray[String])
......@@ -26,7 +26,7 @@ object Constant {
// case class AudienceMerge(devid: String, audience_id: mutable.WrappedArray[Int], update_time: String) extends Serializable
case class AudienceMerge(devid: String, audience_map: String, update_time: String) extends Serializable
case class AudienceMerge(devid: String, audience_map: String, update_time: String, device_type: String) extends Serializable
val read_sql =
"""
......
......@@ -4,13 +4,13 @@ import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.datasource.rtdmp.Constant.{AudienceInfo, NewAudienceInfo}
/**
* @package: mobvista.dmp.datasource.rtdmp
* @author: wangjf
* @date: 2020/8/4
* @time: 4:00 下午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
* @package: mobvista.dmp.datasource.rtdmp
* @author: wangjf
* @date: 2020/8/4
* @time: 4:00 下午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class CustomIteratorAudienceInfo(audinceInfos: Iterator[AudienceInfo], update_time: String, expire_time: String, set: java.util.Set[Integer]) extends Iterator[NewAudienceInfo] {
override def hasNext: Boolean = {
audinceInfos.hasNext
......@@ -33,6 +33,6 @@ class CustomIteratorAudienceInfo(audinceInfos: Iterator[AudienceInfo], update_ti
old_json.putAll(new_json)
NewAudienceInfo(devId, update_time, old_json.toString)
NewAudienceInfo(devId, update_time, old_json.toString, "")
}
}
......@@ -43,6 +43,6 @@ class CustomIteratorAudienceInfoV2(audinceInfos: Iterator[AudienceInfo], update_
).asJava
old_json.putAll(new_json.asInstanceOf[java.util.Map[String, String]])
NewAudienceInfo(devId, update_time, old_json.toString)
NewAudienceInfo(devId, update_time, old_json.toString, "")
}
}
......@@ -27,6 +27,7 @@ class CustomMapPartition(rows: Iterator[Row], update_time: String, expire_time:
val devId = row.getAs[String]("devid")
val audience_data = row.getAs[String]("audience_ids")
val old_audience_data = row.getAs[String]("audience_data")
val device_type = row.getAs[String]("device_type")
val new_json = new JSONObject()
audience_data.split(",", -1).foreach(k => {
if (Integer.parseInt(k) > 0) {
......@@ -43,6 +44,6 @@ class CustomMapPartition(rows: Iterator[Row], update_time: String, expire_time:
).asJava
old_json.putAll(new_json.asInstanceOf[java.util.Map[String, String]])
NewAudienceInfo(devId, update_time, old_json.toString)
NewAudienceInfo(devId, update_time, old_json.toString, device_type)
}
}
package mobvista.dmp.datasource.rtdmp
import java.util.StringJoiner
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.ResultSet
......@@ -15,6 +13,7 @@ import org.apache.hadoop.io.Text
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import java.util.StringJoiner
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
......@@ -78,7 +77,7 @@ object Logic {
old_json.put(audienceId, new_json.getString(audienceId))
}
})
array.add(Constant.NewAudienceInfo(devId, "", old_json.toJSONString))
array.add(Constant.NewAudienceInfo(devId, "", old_json.toJSONString, ""))
})
array.iterator
}
......@@ -601,6 +600,17 @@ object Logic {
old_json.put(audienceId, new_json.getString(audienceId))
}
})
NewAudienceInfo(devId, "", old_json.toJSONString)
NewAudienceInfo(devId, "", old_json.toJSONString, "")
}
def getDevType(device_type: String): String = {
device_type match {
case "imeimd5" | "imei_md5" => "imei_md5"
case "gaidmd5" | "gaid_md5" => "gaid_md5"
case "oaidmd5" | "oaid_md5" => "oaid_md5"
case "idfamd5" | "idfa_md5" => "idfa_md5"
case "androididmd5" | "androidid_md5" => "androidid_md5"
case _ => ""
}
}
}
......@@ -3,10 +3,14 @@ package mobvista.dmp.datasource.rtdmp
import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.Constant.AudienceInfoPre
import mobvista.dmp.datasource.rtdmp.Logic.getDevType
import mobvista.dmp.util.{DateUtil, MD5Util}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.slf4j.LoggerFactory
......@@ -56,7 +60,7 @@ class RTDmpMainPre extends CommonSparkJob with Serializable {
val sc = spark.sparkContext
try {
var mergeRDD = sc.emptyRDD[(String, Int)]
var mergeRDD = sc.emptyRDD[(String, (Int, String))]
// 默认计算上个小时的数据
val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
......@@ -68,6 +72,10 @@ class RTDmpMainPre extends CommonSparkJob with Serializable {
val map: util.Map[Integer, (JSONArray, Integer, Integer, JSONObject)] =
ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 1, 2)
val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]
map.foreach(t => {
val audienceId = Integer2int(t._1)
val audienceOp = t._2._2
......@@ -91,21 +99,31 @@ class RTDmpMainPre extends CommonSparkJob with Serializable {
val pathUri = new URI(list.get(0)._1)
val newAudience = if (FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
.exists(new Path(pathUri.toString.replace("*", "")))) {
sc.textFile(list.get(0)._1).repartition(100)
.filter(r => {
r.length <= 64
})
.map(r => { // First 为最新的人群包
val device_id =
if (r.matches(MobvistaConstant.md5Ptn)) {
r
} else {
MD5Util.getMD5Str(r)
}
(device_id, audienceId)
val rdd = sc.newAPIHadoopFile(list.get(0)._1, fc, kc, vc, sc.hadoopConfiguration)
val linesWithFileNames = rdd.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((inputSplit, iterator) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(tup => (file.getPath.getParent.getName, tup._2))
})
linesWithFileNames.repartition(100)
.filter(r => {
r._2.toString.length <= 64
}).map(r => { // First 为最新的人群包
val device_id =
if (r._2.toString.matches(MobvistaConstant.md5Ptn)) {
r._2.toString
} else {
MD5Util.getMD5Str(r._2.toString)
}
val device_type = if (r._1.endsWith("md5")) {
r._1
} else {
s"${r._1}_md5"
}
(device_id, (audienceId, getDevType(device_type)))
})
} else {
sc.emptyRDD[(String, Int)]
sc.emptyRDD[(String, (Int, String))]
}
/*
val oldAudience = sc.textFile(list.get(1)._1).repartition(100).map(r => { // Second 为旧的人群包,表示上一版本人群包
......@@ -135,24 +153,36 @@ class RTDmpMainPre extends CommonSparkJob with Serializable {
val pathUri = new URI(audData.get(0)._1)
if (audData.nonEmpty && FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
.exists(new Path(audData.get(0)._1.replace("*", "")))) {
sc.textFile(audData.get(0)._1)
val rdd = sc.newAPIHadoopFile(audData.get(0)._1, fc, kc, vc, sc.hadoopConfiguration)
val linesWithFileNames = rdd.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((inputSplit, iterator) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(tup => (file.getPath.getParent.getName, tup._2))
})
linesWithFileNames
.filter(r => {
r.length <= 64
r._2.toString.length <= 64
})
.map(r => { // 取出最新的人群包
val device_id =
if (r.matches(MobvistaConstant.md5Ptn)) {
r
if (r._2.toString.matches(MobvistaConstant.md5Ptn)) {
r._2.toString
} else {
MD5Util.getMD5Str(r)
MD5Util.getMD5Str(r._2.toString)
}
(device_id, audienceId)
val device_type = if (r._1.endsWith("md5")) {
r._1
} else {
s"${r._1}_md5"
}
(device_id, (audienceId, getDevType(device_type)))
})
} else { // 如果没有,则创建 空RDD
sc.emptyRDD[(String, Int)]
sc.emptyRDD[(String, (Int, String))]
}
} else {
sc.emptyRDD[(String, Int)]
sc.emptyRDD[(String, (Int, String))]
}
// 所有人群包进行合并操作
mergeRDD = mergeRDD.union(updateRDD)
......@@ -160,19 +190,23 @@ class RTDmpMainPre extends CommonSparkJob with Serializable {
val df = mergeRDD.repartition(1000).groupByKey().map(r => {
val devId = r._1
var deviceType = ""
val set = new mutable.HashSet[Int]()
// 生成 audienceId -> update_date JSONObject
r._2.foreach(t => {
set.add(t)
set.add(t._1)
if (StringUtils.isBlank(deviceType)) {
deviceType = t._2
}
})
(devId, set)
(devId, set, deviceType)
})
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
df.map(r => {
AudienceInfoPre(r._1, r._2.mkString(","))
AudienceInfoPre(r._1, r._2.mkString(","), r._3)
}).repartition(coalesce.toInt)
.toDF
.write
......
......@@ -112,7 +112,7 @@ class RTDmpMainSpe extends CommonSparkJob with Serializable {
.asScala.keySet
val df = selectDF.join(cassandraDF, Seq("devid"), "leftouter")
.select("devid", "audience_ids", "audience_data")
.select("devid", "audience_ids", "audience_data", "device_type")
.rdd
.mapPartitions(v => new CustomMapPartition(v, update_time = time, expire_time, update_ids))
......
......@@ -58,10 +58,13 @@ class RTDmpMerge extends CommonSparkJob with Serializable {
val mapper = new ObjectMapper()
try {
import spark.implicits._
val paths = input.split(",", -1)
val daily_df = spark.read.orc(paths(0), paths(1), paths(2), paths(3), paths(4), paths(5)).rdd.map(row => {
// val paths = input.split(",", -1)
// spark.read.orc(paths(0), paths(1), paths(2), paths(3), paths(4), paths(5))
val daily_df = spark.read.orc(input).rdd.map(row => {
val deviceId = row.getAs[String]("devid")
val deviceType = row.getAs[String]("device_type")
val audienceMap = JSON.parseObject(row.getAs[String]("audience_data")).asInstanceOf[java.util.Map[String, String]]
.map(kv => {
val audienceTime = DateUtil.format(DateUtil.parse(kv._2 + ":00:00", "yyyy-MM-dd HH:mm:ss"), "yyyyMMddHH")
......@@ -72,7 +75,7 @@ class RTDmpMerge extends CommonSparkJob with Serializable {
audienceMap.keys.foreach(k => {
updateAudienceIdSet.add(Integer.valueOf(k))
})
AudienceMerge(deviceId, mapper.writeValueAsString(audienceMap.asJava), date_time)
AudienceMerge(deviceId, mapper.writeValueAsString(audienceMap.asJava), date_time, deviceType)
}).toDF
.dropDuplicates()
......@@ -106,11 +109,12 @@ class RTDmpMerge extends CommonSparkJob with Serializable {
|SELECT
| coalesce(d.devid, m.devid) devid,
| coalesce(d.audience_map, m.audience_map) audience_map,
| coalesce(d.update_time, m.update_time) update_time
| coalesce(d.update_time, m.update_time) update_time,
| coalesce(d.device_type, m.device_type) device_type
| FROM daily_rtdmp d
| FULL OUTER JOIN
| (SELECT devid, process(audience_map) audience_map,
| update_time
| update_time, device_type
| FROM dwh.audience_merge
| WHERE dt = '@dt' AND update_time >= '@update_time'
| ) m
......
......@@ -6,6 +6,7 @@ import mobvista.dmp.util.DateUtil
import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory
import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.functions.lit
import ru.yandex.clickhouse.ClickHouseDataSource
import scala.collection.JavaConversions._
......@@ -51,6 +52,7 @@ class RTDmpMergeCK extends CommonSparkJob with Serializable {
spark.udf.register("process", process _)
val df = spark.sql(sql.replace("@dt", date_time))
.withColumn("device_type", lit(""))
implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment