Commit 592aad84 by WangJinfeng

fix dm_interest_tag_all_v3.sh

parent 12a03fcf
......@@ -59,8 +59,7 @@ spark-submit --class mobvista.dmp.datasource.dm.DmInterestTagAllV2 \
--conf spark.sql.broadcastTimeout=3600 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 6g --executor-cores 4 --num-executors 256 \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 3 --num-executors 300 \
../${JAR} \
-output ${OUTPUT_PATH} -date ${date} -ga_date ${ga_date} -coalesce 5000
......
......@@ -30,6 +30,7 @@ import java.io.*;
import java.net.URISyntaxException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
......@@ -60,6 +61,8 @@ public class RTDmpFetch {
private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyy-MM-dd HH");
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final MySQLUtil mySqlUtil = new MySQLUtil();
public static final Logger LOGGER = LoggerFactory.getLogger(RTDmpFetch.class);
......@@ -303,6 +306,15 @@ public class RTDmpFetch {
private static JSONArray ruleAudienceInfo(String startTime, String endTime) {
Long startTimestamp = null;
Long endTimestamp = null;
try {
startTimestamp = simpleDateFormat.parse(startTime).getTime() / 1000;
endTimestamp = simpleDateFormat.parse(endTime).getTime() / 1000;
} catch (ParseException e) {
e.printStackTrace();
}
CloseableHttpClient client = HttpClients.createDefault();
List<BasicNameValuePair> formparams = new ArrayList<>();
......@@ -311,7 +323,6 @@ public class RTDmpFetch {
try {
uri = new URIBuilder(serverUrl)
.addParameter("update_time_start", startTime)
.addParameter("update_time_end", endTime)
.addParameter("audience_type", "3")
.addParameter("is_offline", "1");
} catch (URISyntaxException e) {
......@@ -338,7 +349,7 @@ public class RTDmpFetch {
httpGet.setHeader("key", key);
httpGet.setHeader("token", token);
JSONArray jsonArray = new JSONArray();
JSONArray jsonArrayReturn = new JSONArray();
CloseableHttpResponse response;
try {
response = client.execute(httpGet);
......@@ -349,19 +360,28 @@ public class RTDmpFetch {
while ((line = rd.readLine()) != null) {
result.append(line);
}
JSONObject jsonObject = Constants.String2JSONObject(result.toString());
if (jsonObject.getInteger("code") == 200 && jsonObject.containsKey("data")) {
jsonArray = jsonObject.getJSONArray("data");
/*
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject json = jsonArray.getJSONObject(i);
if (json.getLongValue("audience_data_utime") >= startTimestamp && json.getLongValue("audience_data_utime") < endTimestamp) {
jsonArrayReturn.add(json);
}
}
*/
jsonArrayReturn = jsonObject.getJSONArray("data");
}
} catch (IOException e) {
LOGGER.info(e.getMessage());
jsonArray = new JSONArray();
jsonArrayReturn = new JSONArray();
} finally {
httpGet.abort();
}
LOGGER.info("ruleAudienceInfo.result: " + jsonArray);
LOGGER.info("ruleAudienceInfo.result: " + jsonArrayReturn);
return jsonArray;
return jsonArrayReturn;
}
private static void update(String requestBody) {
......@@ -405,22 +425,23 @@ public class RTDmpFetch {
private static Tuple checkRules(JSONObject jsonObject, long startTime, long endTime) {
JSONArray audienceIds = new JSONArray();
if (jsonObject.containsKey("audience_rules")) {
if (jsonObject.containsKey("group_rules")) {
JSONArray group_rules = jsonObject.getJSONArray("group_rules");
for (int i = 0; i < group_rules.size(); i++) {
audienceIds.addAll(group_rules.getJSONObject(i).getJSONArray("audiences"));
}
}
if (jsonObject.containsKey("audience_rules") && audienceIds.isEmpty()) {
JSONObject audience_rules = jsonObject.getJSONObject("audience_rules");
if (audience_rules.containsKey("intersections")) {
if (audience_rules.containsKey("intersections") && !audience_rules.getJSONArray("intersections").isEmpty()) {
audienceIds.addAll(audience_rules.getJSONArray("intersections"));
}
if (audience_rules.containsKey("union")) {
if (audience_rules.containsKey("union") && !audience_rules.getJSONArray("union").isEmpty()) {
audienceIds.addAll(audience_rules.getJSONArray("union"));
}
if (audience_rules.containsKey("subtraction")) {
if (audience_rules.containsKey("subtraction") && !audience_rules.getJSONArray("subtraction").isEmpty()) {
audienceIds.addAll(audience_rules.getJSONArray("subtraction"));
}
} else if (jsonObject.containsKey("group_rules")) {
JSONArray group_rules = jsonObject.getJSONArray("group_rules");
for (int i = 0; i < group_rules.size(); i++) {
audienceIds.addAll(group_rules.getJSONObject(i).getJSONArray("audiences"));
}
}
if (audienceIds.size() > 0) {
long nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000;
......@@ -448,31 +469,7 @@ public class RTDmpFetch {
StringBuilder sql = new StringBuilder();
sql.append("SELECT @key FROM dwh.@table WHERE dt = '").append(dt).append("' AND hour = '").append(hour).append("' AND device_type != 'unknown' ");
if (jsonObject.containsKey("audience_rules")) {
JSONObject audience_rules = jsonObject.getJSONObject("audience_rules");
// String[] rules = jsonObject.getString("audience_rules_str").split(";", -1);
StringBuilder ruleSql = new StringBuilder();
// hasAll
if (audience_rules.containsKey("intersections") && !audience_rules.getJSONArray("intersections").isEmpty()) {
ruleSql.append("hasAll(audience_id,").append(audience_rules.getJSONArray("intersections")).append(")");
}
// hasAny
if (audience_rules.containsKey("union") && !audience_rules.getJSONArray("union").isEmpty()) {
if (StringUtils.isNotBlank(ruleSql)) {
ruleSql.append(" OR hasAny(audience_id,").append(audience_rules.getJSONArray("union")).append(")");
} else {
ruleSql.append("hasAny(audience_id,").append(audience_rules.getJSONArray("union")).append(")");
}
}
if (StringUtils.isNotBlank(ruleSql)) {
sql.append("AND (").append(ruleSql).append(")");
}
// not hasAny
if (audience_rules.containsKey("subtraction") && !audience_rules.getJSONArray("subtraction").isEmpty()) {
sql.append(" AND NOT hasAny(audience_id,").append(audience_rules.getJSONArray("subtraction")).append(")");
}
} else if (jsonObject.containsKey("group_rules")) {
if (jsonObject.containsKey("group_rules")) {
StringBuilder ruleSql = new StringBuilder();
JSONArray group_rules = jsonObject.getJSONArray("group_rules");
for (int i = 0; i < group_rules.size(); i++) {
......@@ -497,6 +494,32 @@ public class RTDmpFetch {
sql.append("AND (").append(ruleSql).append(")");
}
}
if (jsonObject.containsKey("audience_rules")) {
JSONObject audience_rules = jsonObject.getJSONObject("audience_rules");
// String[] rules = jsonObject.getString("audience_rules_str").split(";", -1);
StringBuilder ruleSql = new StringBuilder();
// hasAll
if (audience_rules.containsKey("intersections") && audience_rules.getJSONArray("intersections") != null && !audience_rules.getJSONArray("intersections").isEmpty()) {
ruleSql.append("hasAll(audience_id,").append(audience_rules.getJSONArray("intersections")).append(")");
}
// hasAny
if (audience_rules.containsKey("union") && audience_rules.getJSONArray("union") != null && !audience_rules.getJSONArray("union").isEmpty()) {
if (StringUtils.isNotBlank(ruleSql)) {
ruleSql.append(" OR hasAny(audience_id,").append(audience_rules.getJSONArray("union")).append(")");
} else {
ruleSql.append("hasAny(audience_id,").append(audience_rules.getJSONArray("union")).append(")");
}
}
if (StringUtils.isNotBlank(ruleSql)) {
sql.append("AND (").append(ruleSql).append(")");
}
// not hasAny
if (audience_rules.containsKey("subtraction") && audience_rules.getJSONArray("subtraction") != null && !audience_rules.getJSONArray("subtraction").isEmpty()) {
sql.append(" AND NOT hasAny(audience_id,").append(audience_rules.getJSONArray("subtraction")).append(")");
}
}
return sql.toString();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment