Commit 5a91cf65 by WangJinfeng

fix RTDmpFetch bug

parent 45302635
...@@ -3,6 +3,7 @@ package mobvista.dmp.datasource.rtdmp; ...@@ -3,6 +3,7 @@ package mobvista.dmp.datasource.rtdmp;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import mobvista.dmp.common.Constants; import mobvista.dmp.common.Constants;
import mobvista.dmp.datasource.rtdmp.entity.Tuple; import mobvista.dmp.datasource.rtdmp.entity.Tuple;
import mobvista.dmp.util.*; import mobvista.dmp.util.*;
...@@ -30,6 +31,7 @@ import java.sql.ResultSet; ...@@ -30,6 +31,7 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.*;
/** /**
* @package: mobvista.dmp.datasource.rtdmp * @package: mobvista.dmp.datasource.rtdmp
...@@ -80,10 +82,23 @@ public class RTDmpFetch { ...@@ -80,10 +82,23 @@ public class RTDmpFetch {
String part = DateUtil.format(DateUtil.parse(update_time_end, "yyyy-MM-dd HH:mm:ss"), "yyyyMMddHH"); String part = DateUtil.format(DateUtil.parse(update_time_end, "yyyy-MM-dd HH:mm:ss"), "yyyyMMddHH");
JSONArray jsonArray = ruleAudienceInfo(update_time_start, update_time_end); JSONArray jsonArray = ruleAudienceInfo(update_time_start, update_time_end);
jsonArray.sort(Comparator.comparing(obj -> ((JSONObject) obj).getIntValue("id"))); jsonArray.sort(Comparator.comparing(obj -> ((JSONObject) obj).getIntValue("id")));
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RTDmpFetch-%d").build();
ExecutorService pool = new ThreadPoolExecutor(5, 10,
120L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(64), threadFactory, new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < jsonArray.size(); i++) { for (int i = 0; i < jsonArray.size(); i++) {
JSONObject audienceObject = jsonArray.getJSONObject(i); int finalI = i;
pool.execute(() -> {
JSONObject audienceObject = jsonArray.getJSONObject(finalI);
doFetch(audienceObject, part); doFetch(audienceObject, part);
});
} }
pool.shutdown();
} }
private static void doFetch(JSONObject audienceObject, String part) { private static void doFetch(JSONObject audienceObject, String part) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment