Commit 4bcf2879 by kangxiaoshan

跑流量

parent cff31efe
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -26,6 +26,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
......@@ -408,7 +411,10 @@ public class TrackingFlowTask {
* 功能描述:临时跑一段时间内的流量任务
*/
public void reset(String lastEmail) {
ExecutorService executorService = Executors.newFixedThreadPool(30);
List<String> emails = contractRepository.findDistinctEmailByPlatform("tkio");
emails = emails.stream().limit(40).collect(Collectors.toList());
List<String> dateInterval = DateUtil.getDateInterval("2021-05-25", "2021-07-15");
// dateInterval.add("2021-06-30");
// emails = Arrays.asList("1291269883@qq.com");
......@@ -432,32 +438,33 @@ public class TrackingFlowTask {
}
for (String email : emails) {
for (String dsone : dateInterval) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
CompletableFuture[] futures = dateInterval.stream().map(dsone ->
CompletableFuture[] futures = emails.stream().map(em ->
CompletableFuture.supplyAsync(() -> {
reset(dsone, email);
return Thread.currentThread().getName();
reset(dsone, em);
return Thread.currentThread().getName() + em;
}
).exceptionally((t) -> {
logger.error("erro on " + email + "-" + dsone, t);
, executorService).exceptionally((t) -> {
logger.error("erro on " + em + "-" + dsone, t);
return "-1";
})).toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures);
String names = Stream.of(futures).map(f -> f.join()).map(v -> ((String) v).split("-")[2]).collect(Collectors.joining(",-"));
long names = Stream.of(futures).map(f -> f.join()).map(v -> (String) v).count();
stopWatch.stop();
logger.info(" {}s, emmial {} ,ForkJoinPool.commonPool-worker-{}", stopWatch.getTotalTimeSeconds(), email, names);
logger.info(" {}s, ds {} ,ForkJoinPool.commonPool-worker-{}", stopWatch.getTotalTimeSeconds(), dsone, names);
}
logger.info("reset complate ! ...");
executorService.shutdown();
}
public void reset(String yesterday, String email) {
//logger.info("ds {} eamil {} running...", yesterday, email);
logger.info("ds {} eamil {} running...", yesterday, email);
tkioFlowRepository.deleteByEmailDs(email, yesterday);
//tkioFlowRepository.deleteByEmailDs(email, yesterday);
//查询用户下所有appkey
Account account = accountRepository.findByEmail(email);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment