Commit c120c07a by kangxiaoshan

修复重置流量

parent c3a87d12
......@@ -8,6 +8,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
public interface TkioFlowRepository extends JpaRepository<TkioFlow, Long> {
......@@ -25,8 +26,12 @@ public interface TkioFlowRepository extends JpaRepository<TkioFlow, Long> {
@Query(value = "select sum(flow) from tkio_flow where contract_code in ?3 and ds >= ?1 and ds <= ?2", nativeQuery = true)
BigDecimal sumFlowByDsAndContractCodes(String startDate, String endDate, ArrayList<String> contractCode);
@Query(value = "select id from tkio_flow where contract_code in (select contract_code from tkio_flow where email = ?1) and ds = ?2", nativeQuery = true)
List<Long> findIdByEmailDs(String email, String yesterday);
@Transactional
@Modifying
@Query(value = "delete from tkio_flow where contract_code in ( select contract_code from tkio_flow where email=?1 ) and ds = ?2", nativeQuery = true)
void deleteByEmailDs(String email, String yesterday);
@Query(value = "delete from tkio_flow where id in ?1", nativeQuery = true)
void deleteByIds(List<Long> delIdList);
}
......@@ -446,27 +446,25 @@ public class TrackingFlowTask {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String status = restTaskInfoRepository.findStatus(info.getId());
if (status.equals("init")) {
if (!status.equals("init")) {
logger.info("暂停重置流量 ......");
break;
}
CompletableFuture[] futures = emails.stream().map(em ->
CompletableFuture.supplyAsync(() -> {
CompletableFuture.runAsync(() -> {
reset(dsone, em);
return Thread.currentThread().getName() + em;
}, executorService).exceptionally((t) -> {
logger.error("重置流量错误 " + em + "-" + dsone, t);
return "-1";
return null;
})).toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures);
long names = Stream.of(futures).map(f -> f.join()).map(v -> (String) v).count();
CompletableFuture.allOf(futures).join();
stopWatch.stop();
logger.info("重置流量 {} accoount, {}s, ds {} ", names, stopWatch.getTotalTimeSeconds(), dsone);
logger.info("重置流量 {} accoount, {}s, ds {} ", emails.size(), stopWatch.getTotalTimeSeconds(), dsone);
}
logger.info("重置流量完成 ! ...");
info.setStartDs("complate");
info.setStatus("complate");
all.stop();
info.setAllSeconds(all.getTotalTimeSeconds());
restTaskInfoRepository.save(info);
......@@ -476,7 +474,10 @@ public class TrackingFlowTask {
public void reset(String yesterday, String email) {
//删除历史数据
tkioFlowRepository.deleteByEmailDs(email, yesterday);
List<Long> delIdList = tkioFlowRepository.findIdByEmailDs(email, yesterday);
if (delIdList != null && !delIdList.isEmpty()) {
tkioFlowRepository.deleteByIds(delIdList);
}
//查询用户下所有appkey
Account account = accountRepository.findByEmail(email);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment