Commit b34b1db5 by WangJinfeng

init RTDmpRequestDaily,人群定向优化

parent 872d6594
type=command type=command
dependencies=rtdmp_request_tencent,rtdmp_request_other,rtdmp_request_dsp,rtdmp_request_btop,rtdmp_request_youku_acquisition,rtdmp_request_alive dependencies=rtdmp_request_daily,rtdmp_request_tencent,rtdmp_request_other,rtdmp_request_dsp,rtdmp_request_btop,rtdmp_request_youku_acquisition,rtdmp_request_alive
command=echo "RTDmp Request Success !!!" command=echo "RTDmp Request Success !!!"
\ No newline at end of file
type=command
command=bash -x rtdmp_request_daily.sh
\ No newline at end of file
#!/bin/bash
source ../dmp_env.sh
today=${ScheduleTime}
date=$(date +%Y-%m-%d -d "-1 day $today")
date_path=$(date +%Y/%m/%d -d "-1 day $today")
OUTPUT="s3://mob-emr-test/dataplatform/rtdmp_request/${date_path}/install"
spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpRequestDaily \
--name "RTDmpRequestDaily.${date}.install" \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.default.parallelism=2000 \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 4g --executor-cores 3 --num-executors 100 \
../${JAR} -date "${date}" -output ${OUTPUT} -coalesce 200
if [[ $? -ne 0 ]]; then
exit 255
fi
...@@ -416,6 +416,11 @@ public class RTDmpFetch { ...@@ -416,6 +416,11 @@ public class RTDmpFetch {
if (audience_rules.containsKey("subtraction")) { if (audience_rules.containsKey("subtraction")) {
audienceIds.addAll(audience_rules.getJSONArray("subtraction")); audienceIds.addAll(audience_rules.getJSONArray("subtraction"));
} }
} else if (jsonObject.containsKey("group_rules")) {
JSONArray group_rules = jsonObject.getJSONArray("group_rules");
for (int i = 0; i < group_rules.size(); i++) {
audienceIds.addAll(group_rules.getJSONObject(i).getJSONArray("audiences"));
}
} }
if (audienceIds.size() > 0) { if (audienceIds.size() > 0) {
long nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000; long nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000;
...@@ -467,6 +472,30 @@ public class RTDmpFetch { ...@@ -467,6 +472,30 @@ public class RTDmpFetch {
if (audience_rules.containsKey("subtraction") && !audience_rules.getJSONArray("subtraction").isEmpty()) { if (audience_rules.containsKey("subtraction") && !audience_rules.getJSONArray("subtraction").isEmpty()) {
sql.append(" AND NOT hasAny(audience_id,").append(audience_rules.getJSONArray("subtraction")).append(")"); sql.append(" AND NOT hasAny(audience_id,").append(audience_rules.getJSONArray("subtraction")).append(")");
} }
} else if (jsonObject.containsKey("group_rules")) {
StringBuilder ruleSql = new StringBuilder();
JSONArray group_rules = jsonObject.getJSONArray("group_rules");
for (int i = 0; i < group_rules.size(); i++) {
JSONObject json = Constants.String2JSONObject(group_rules.getJSONObject(i).toString());
if (json.getString("inner_op").equals("|")) {
ruleSql.insert(0, "hasAny(audience_id," + json.getJSONArray("audiences") + ")");
} else {
ruleSql.insert(0, "hasAll(audience_id," + json.getJSONArray("audiences") + ")");
}
if (i + 1 < group_rules.size()) {
JSONObject outerJson = Constants.String2JSONObject(group_rules.getJSONObject(i + 1).toString());
if (outerJson.getString("outer_op").equals("-")) {
ruleSql.insert(0, " AND NOT (").append(")");
} else if (outerJson.getString("outer_op").equals("|")) {
ruleSql.insert(0, " OR ");
} else {
ruleSql.insert(0, " AND ");
}
}
}
if (StringUtils.isNotBlank(ruleSql)) {
sql.append("AND (").append(ruleSql).append(")");
}
} }
return sql.toString(); return sql.toString();
} }
......
...@@ -613,4 +613,29 @@ object Logic { ...@@ -613,4 +613,29 @@ object Logic {
case _ => "" case _ => ""
} }
} }
def changeDeviceId(device_type: String, device_id: String): String = {
device_type match {
case "idfa" =>
device_id.toUpperCase
case _ =>
device_id
}
if (device_id.matches(MobvistaConstant.md5Ptn)) {
device_id
} else {
MobvistaConstant.getMd5(device_id)
}
}
def changeDeviceType(device_type: String): String = {
device_type match {
case "imeimd5" | "imei_md5" | "imei" => "imei_md5"
case "gaidmd5" | "gaid_md5" | "gaid" => "gaid_md5"
case "oaidmd5" | "oaid_md5" | "oaid" => "oaid_md5"
case "idfamd5" | "idfa_md5" | "idfa" => "idfa_md5"
case "androididmd5" | "androidid_md5" | "androidid" => "androidid_md5"
case _ => ""
}
}
} }
...@@ -76,20 +76,20 @@ class RTDmpRequestDaily extends CommonSparkJob with Serializable { ...@@ -76,20 +76,20 @@ class RTDmpRequestDaily extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
val map = new mutable.HashMap[Integer, mutable.HashSet[String]]() val map = new mutable.HashMap[Integer, Array[String]]()
for (id: String <- json.keySet().toArray()) { for (id <- json.keySet().toArray()) {
val pkgSet = json.getJSONObject(id).getString("installed_package_name").split(",", -1) val pkgSet = json.getJSONObject(String.valueOf(id)).getString("installed_package_name").split(",", -1)
map.put(Integer.parseInt(id), pkgSet) map.put(Integer.parseInt(String.valueOf(id)), pkgSet)
} }
val rdd = spark.sql(sql).rdd.map(row => { val rdd = spark.sql(sql).rdd.map(row => {
val deviceId = row.getAs[String]("device_id") val deviceId = row.getAs[String]("device_id")
val deviceType = row.getAs[String]("device_type") val deviceType = row.getAs[String]("device_type")
val install_list = MobvistaConstant.String2JSONObject(row.getAs[String]("install_list")).keySet() val install_list = MobvistaConstant.String2JSONObject(row.getAs[String]("install_list")).keySet().asScala
val array = new ArrayBuffer[(String, String, Integer)]() val array = new ArrayBuffer[(String, String, Integer)]()
for (key <- map.keySet) { for (key <- map.keySet) {
if (map(key).intersect(install_list.asScala).size > 0) { if (map(key).toSet.intersect(install_list).size > 0) {
array += (deviceId, deviceType, key) array += ((deviceId, deviceType, key))
} }
} }
array array
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment