Commit 62af79db by xioa

同步数据增加线程池

parent 5dc5cf7c
...@@ -21,6 +21,7 @@ public class TkioFlowHistory { ...@@ -21,6 +21,7 @@ public class TkioFlowHistory {
private String yMonth; private String yMonth;
private String yMonthNext; private String yMonthNext;
private String nextCode; private String nextCode;
private Integer firstRecode;
...@@ -108,4 +109,12 @@ public class TkioFlowHistory { ...@@ -108,4 +109,12 @@ public class TkioFlowHistory {
public void setNextCode(String nextCode) { public void setNextCode(String nextCode) {
this.nextCode = nextCode; this.nextCode = nextCode;
} }
public Integer getFirstRecode() {
return firstRecode;
}
public void setFirstRecode(Integer firstRecode) {
this.firstRecode = firstRecode;
}
} }
...@@ -2,14 +2,20 @@ package common.repository; ...@@ -2,14 +2,20 @@ package common.repository;
import common.model.CalculationFlow; import common.model.CalculationFlow;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.List; import java.util.List;
@Transactional @Transactional
public interface CalculationFlowRepository extends JpaRepository<CalculationFlow,Long> { public interface CalculationFlowRepository extends JpaRepository<CalculationFlow, Long> {
@Query(value = "select * from calculation_flow where status = ?1",nativeQuery = true) @Query(value = "select * from calculation_flow where status = ?1", nativeQuery = true)
List<CalculationFlow> findByStatus(int status); List<CalculationFlow> findByStatus(int status);
@Query(value = "update calculation_flow set status = ?2 where id = ?1 ", nativeQuery = true)
@Transactional
@Modifying
void updateStatus(Long id, int status);
} }
...@@ -12,7 +12,7 @@ public interface TkioFlowHistoryRepository extends JpaRepository<TkioFlowHistory ...@@ -12,7 +12,7 @@ public interface TkioFlowHistoryRepository extends JpaRepository<TkioFlowHistory
@Query(value = "select contract_code from tkio_flow_history where email = ?1 order by ds limit 1", nativeQuery = true) @Query(value = "select contract_code from tkio_flow_history where email = ?1 order by ds limit 1", nativeQuery = true)
String findLastRecodeContract(String email); String findLastRecodeContract(String email);
@Query(value = "select sum(flow) from tkio_flow_history where email = ?1 and contract_code = ?2", nativeQuery = true) @Query(value = "select sum(flow) from tkio_flow_history where email = ?1 and contract_code = ?2", nativeQuery = true)
......
...@@ -24,6 +24,9 @@ import java.util.ArrayList; ...@@ -24,6 +24,9 @@ import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -52,160 +55,187 @@ public class TrackingFlowTask { ...@@ -52,160 +55,187 @@ public class TrackingFlowTask {
@Autowired @Autowired
private TkioFlowHistoryRepository tkioFlowHistoryRepository; private TkioFlowHistoryRepository tkioFlowHistoryRepository;
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void syncFlow() { public void syncFlow() {
List<CalculationFlow> calculationFlows = calculationFlowRepository.findByStatus(0); List<CalculationFlow> calculationFlows = calculationFlowRepository.findByStatus(0);
for (CalculationFlow calculationFlow : calculationFlows) { for (CalculationFlow calculationFlow : calculationFlows) {
List<TkioFlow> tkioFlowList = new ArrayList<>(); //List<TkioFlow> tkioFlowList = new ArrayList<>();
calculationFlow.setStatus(1); calculationFlow.setStatus(1);
calculationFlowRepository.save(calculationFlow); calculationFlowRepository.save(calculationFlow);
tkioFlowRepository.deleteByEmail(calculationFlow.getEmail()); tkioFlowRepository.deleteByEmail(calculationFlow.getEmail());
tkioFlowHistoryRepository.deleteByEmail(calculationFlow.getEmail()); tkioFlowHistoryRepository.deleteByEmail(calculationFlow.getEmail());
String email = calculationFlow.getEmail(); String email = calculationFlow.getEmail();
//查询用户下所有appkey //查询用户下所有appkey
//每个用户数据在单独线程处理
executor.execute(new RunableTask(email, calculationFlow));
}
}
private class RunableTask implements Runnable {
private String email;
private CalculationFlow calculationFlow;
public RunableTask(String email, CalculationFlow calculationFlow) {
this.email = email;
this.calculationFlow = calculationFlow;
}
@Override
public void run() {
calculationFlowByAccount(calculationFlow, email);
}
}
private void calculationFlowByAccount(CalculationFlow calculationFlow, String email) {
try {
Account account = accountRepository.findByEmail(email);
List<Account> accountList = accountRepository.findByRootParent(account.getRootParent());
List<Long> idList = new ArrayList<>();
for (Account ac : accountList) {
idList.add(ac.getId());
}
List<String> appkeys = appRepository.findAppkeysNotDebug(idList);
String appkeyStr = String.join("','", appkeys);
appkeyStr = "'" + appkeyStr + "'";
String ago = DateUtil.format(account.getCreateTime(), DateUtil.C_DATE_PATTON_DEFAULT);//查找最早一天的流量
if (ago == null) {
/*calculationFlow.setStatus(2);
calculationFlowRepository.save(calculationFlow);*/
calculationFlowRepository.updateStatus(calculationFlow.getId(), 2);
return;
}
int between = 0;
try { try {
Account account = accountRepository.findByEmail(email); between = DateUtil.daysBetween(ago, DateUtil.getBeforeDays(1)) + 1;
List<Account> accountList = accountRepository.findByRootParent(account.getRootParent()); } catch (ParseException e) {
List<Long> idList = new ArrayList<>(); logger.error("强转错误:", e);
for (Account ac : accountList) { }
idList.add(ac.getId()); int startInt = 0;
} if (Integer.valueOf(DateUtil.getHH()) > 10) {
List<String> appkeys = appRepository.findAppkeysNotDebug(idList); startInt = -1;
String appkeyStr = String.join("','", appkeys); }
appkeyStr = "'" + appkeyStr + "'"; for (int ii = between; ii > startInt; ii--) {
String yesterday = DateUtil.getBeforeDays(ii);//昨日
String ago = DateUtil.format(account.getCreateTime(), DateUtil.C_DATE_PATTON_DEFAULT);//查找最早一天的流量 BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum");
if (ago == null) { if (clickNum != null && clickNum.longValue() > 0) {
calculationFlow.setStatus(2); List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email);
calculationFlowRepository.save(calculationFlow); if (contracts.size() == 1) {//只有一个合同
continue; TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0));
} if (tkioFlow != null) {
//tkioFlowList.add(tkioFlow);
tkioFlow.setContractCount(1);
tkioFlowRepository.save(tkioFlow);
}
} else {
int between = 0; getFlowOnMonths(email, yesterday, clickNum, contracts);//月流量报表逻辑
try {
between = DateUtil.daysBetween(ago, DateUtil.getBeforeDays(1)) + 1; //多个合同
} catch (ParseException e) { //看昨日被哪几个合同包含了
logger.error("强转错误:", e); //昨日有几个合同开始生效
} List<Contract> correlationContract = new ArrayList<>();
int startInt = 0; for (Contract contract : contracts) {
if (Integer.valueOf(DateUtil.getHH()) > 10) { /*if (ContractStatusEnum.CANCEL.getKey().equals(contract.getStatus()) || ContractStatusEnum.SUSPEND.getKey().equals(contract.getStatus())) {
startInt = -1; //中止或作废合同处理结束时间,以方便昨日流量的归属计算
} ContractChange contractChange = contractChangeRepository.findByContentCode(ContractStatusEnum.CANCEL.getValue(), contract.getContractCode());
for (int ii = between; ii > startInt; ii--) { if (contractChange != null) {
String yesterday = DateUtil.getBeforeDays(ii);//昨日 contract.setEndDate(new DateTime(contractChange.getCreateTime()).toString("yyyy-MM-dd"));
BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum"); }
if (clickNum != null && clickNum.longValue() > 0) { }*/
List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email); String startDate = contract.getStartDate();
if (contracts.size() == 1) {//只有一个合同 String endDate = contract.getEndDate();
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0)); if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) {
if (tkioFlow != null) { correlationContract.add(contract);
tkioFlowList.add(tkioFlow);
tkioFlow.setContractCount(1);
tkioFlowRepository.save(tkioFlow);
} }
} else { }
//多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下(昨日多个合同生效)
getFlowOnMonths(email, yesterday, clickNum, contracts);//月流量报表逻辑 if (correlationContract.size() > 1) {
//冒泡
//第一优先级:合同开始日期,第二优先级,合同编号大小
Contract[] contractsArray = new Contract[correlationContract.size()];
contractsArray = correlationContract.toArray(contractsArray);
contractsArray = orderByContract(contractsArray);
//多个合同 for (int i = 0; i < contractsArray.length; i++) {
//看昨日被哪几个合同包含了 TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]);
//昨日有几个合同开始生效 tkioFlow.setContractCount(contracts.size());
List<Contract> correlationContract = new ArrayList<>(); if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {
for (Contract contract : contracts) { //处理成本流量,如果超出了,依次算在下一个合同上
/*if (ContractStatusEnum.CANCEL.getKey().equals(contract.getStatus()) || ContractStatusEnum.SUSPEND.getKey().equals(contract.getStatus())) { //1、超出的流量 记为下一合同的当日流量 2、该合同自身当日流量为 剩余流量 或者为0
//中止或作废合同处理结束时间,以方便昨日流量的归属计算 clickNum = BigInteger.valueOf(tkioFlow.getCostFlow());
ContractChange contractChange = contractChangeRepository.findByContentCode(ContractStatusEnum.CANCEL.getValue(), contract.getContractCode()); tkioFlow.setCostFlow(null);
if (contractChange != null) { if (tkioFlow.getFlow() > 0) {
contract.setEndDate(new DateTime(contractChange.getCreateTime()).toString("yyyy-MM-dd")); //tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
} else {
if (tkioFlow != null) {
//tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
} }
}*/ break;
String startDate = contract.getStartDate();
String endDate = contract.getEndDate();
if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) {
correlationContract.add(contract);
} }
}
//多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下(昨日多个合同生效)
if (correlationContract.size() > 1) {
//冒泡
//第一优先级:合同开始日期,第二优先级,合同编号大小
Contract[] contractsArray = new Contract[correlationContract.size()];
contractsArray = correlationContract.toArray(contractsArray);
}
} else {
if (correlationContract.size() == 0) {//昨日不包含在所有合同中
//排序
Contract[] contractsArray = new Contract[contracts.size()];
contractsArray = contracts.toArray(contractsArray);
contractsArray = orderByContract(contractsArray); contractsArray = orderByContract(contractsArray);
//如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上
for (int i = 0; i < contractsArray.length; i++) { for (int i = 0; i < contractsArray.length; i++) {
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]); if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) {
tkioFlow.setContractCount(contracts.size()); int j = 0;
if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {
//处理成本流量,如果超出了,依次算在下一个合同上 if (i != 0) {
//1、超出的流量 记为下一合同的当日流量 2、该合同自身当日流量为 剩余流量 或者为0 j = i - 1;
clickNum = BigInteger.valueOf(tkioFlow.getCostFlow());
tkioFlow.setCostFlow(null);
if (tkioFlow.getFlow() > 0) {
tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
} }
} else {
if (i == contractsArray.length - 1) {
j = i;
}
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]);
tkioFlow.setContractCount(contracts.size());
if (tkioFlow != null) { if (tkioFlow != null) {
tkioFlowList.add(tkioFlow); //tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow); tkioFlowRepository.save(tkioFlow);
} }
break; break;
} }
} }
} else { } else {
if (correlationContract.size() == 0) {//昨日不包含在所有合同中 TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0));
//排序 tkioFlow.setContractCount(contracts.size());
Contract[] contractsArray = new Contract[contracts.size()]; if (tkioFlow != null) {
contractsArray = contracts.toArray(contractsArray); //tkioFlowList.add(tkioFlow);
contractsArray = orderByContract(contractsArray); tkioFlowRepository.save(tkioFlow);
//如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上
for (int i = 0; i < contractsArray.length; i++) {
if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) {
int j = 0;
if (i != 0) {
j = i - 1;
}
if (i == contractsArray.length - 1) {
j = i;
}
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]);
tkioFlow.setContractCount(contracts.size());
if (tkioFlow != null) {
tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
break;
}
}
} else {
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0));
tkioFlow.setContractCount(contracts.size());
if (tkioFlow != null) {
tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
} }
} }
} }
} }
} }
if (tkioFlowList.size() > 0) {
//tkioFlowRepository.save(tkioFlowList);
}
calculationFlow.setStatus(2);
calculationFlowRepository.save(calculationFlow);
} catch (Exception e) {
logger.error("CalculationFlow:Id::" + calculationFlow.getId() + ":全流量同步失败", e);
calculationFlow.setStatus(3);
calculationFlowRepository.save(calculationFlow);
} }
/*if (tkioFlowList.size() > 0) {
tkioFlowRepository.save(tkioFlowList);
}*/
/* calculationFlow.setStatus(2);
calculationFlowRepository.save(calculationFlow);*/
calculationFlowRepository.updateStatus(calculationFlow.getId(), 2);
} catch (Exception e) {
logger.error("CalculationFlow:Id::" + calculationFlow.getId() + ":全流量同步失败", e);
/*calculationFlow.setStatus(3);
calculationFlowRepository.save(calculationFlow);*/
calculationFlowRepository.updateStatus(calculationFlow.getId(), 3);
} }
} }
...@@ -221,17 +251,22 @@ public class TrackingFlowTask { ...@@ -221,17 +251,22 @@ public class TrackingFlowTask {
private BigInteger getFlowOnMonths(String email, String yesterday, BigInteger clickNum, List<Contract> contracts) { private BigInteger getFlowOnMonths(String email, String yesterday, BigInteger clickNum, List<Contract> contracts) {
String code = tkioFlowHistoryRepository.findLastRecodeContract(email); String code = tkioFlowHistoryRepository.findLastRecodeContract(email);
List<Contract> sortedContract = sortContracts(contracts); List<Contract> sortedContract = sortContracts(contracts);
BigInteger clickFlow = clickNum;
boolean skip = true; boolean skip = true;
boolean fistRecode = code == null ? true : false;
for (int i = 0; i < sortedContract.size(); i++) { for (int i = 0; i < sortedContract.size(); i++) {
if (skip && !sortedContract.get(i).getContractCode().equals(code)) { if (skip && code != null && !sortedContract.get(i).getContractCode().equals(code)) {
continue;//跳过已记录的合同 continue;//从记录的最后一份合同开始处理
} else { } else {
skip = false; skip = false;
} }
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, sortedContract.get(i), true);
TkioFlow tkioFlow = getTkioFlow(clickFlow, yesterday, sortedContract.get(i), true);
Long costFlow = tkioFlow.getCostFlow() == null ? 0L : tkioFlow.getCostFlow(); Long costFlow = tkioFlow.getCostFlow() == null ? 0L : tkioFlow.getCostFlow();
TkioFlowHistory flowHistory = new TkioFlowHistory(tkioFlow); TkioFlowHistory flowHistory = new TkioFlowHistory(tkioFlow);
flowHistory.setFirstRecode(fistRecode ? 1 : 0);//标记合同的第一次流量记录
fistRecode = false;
flowHistory.setyMonth(DateTime.parse(yesterday).toString("yyyy-MM")); flowHistory.setyMonth(DateTime.parse(yesterday).toString("yyyy-MM"));
flowHistory.setyMonthNext(DateTime.parse(yesterday).plusMonths(1).toString("yyyy-MM")); flowHistory.setyMonthNext(DateTime.parse(yesterday).plusMonths(1).toString("yyyy-MM"));
if (costFlow <= 0) { if (costFlow <= 0) {
...@@ -247,23 +282,26 @@ public class TrackingFlowTask { ...@@ -247,23 +282,26 @@ public class TrackingFlowTask {
if (DateTime.parse(next.getStartDate()).isBefore(DateTime.parse(yesterday)) if (DateTime.parse(next.getStartDate()).isBefore(DateTime.parse(yesterday))
|| DateTime.parse(next.getStartDate()).isEqual(DateTime.parse(yesterday))) { || DateTime.parse(next.getStartDate()).isEqual(DateTime.parse(yesterday))) {
// 生效
if (DateTime.parse(next.getStartDate()).monthOfYear().get() != new DateTime(next.getCreateTime()).monthOfYear().get() if (DateTime.parse(next.getStartDate()).monthOfYear().get() != new DateTime(next.getCreateTime()).monthOfYear().get()
&& new DateTime(next.getCreateTime()).dayOfMonth().get() > 10) { && new DateTime(next.getCreateTime()).dayOfMonth().get() > 10) {
} else { } else {
//2、生效时 切为非晚录合同 超出流量计入下一合同 //2、生效时 且非晚录合同 超出流量计入下一合同
clickNum = new BigInteger(costFlow.toString()); clickFlow = new BigInteger(costFlow.toString());
fistRecode = true;
continue; continue;
} }
} }
//3、未生效或晚录合同 则关联流量信息到当前合同 //3、未生效或晚录合同 则关联流量信息到当前合同
if (DateTime.parse(yesterday).monthOfYear().get() == DateTime.parse(next.getStartDate()).monthOfYear().get()) { if (DateTime.parse(yesterday).monthOfYear().get() == DateTime.parse(next.getStartDate()).monthOfYear().get()) {
//生效当月 超出的流量关联到 下一份合同 //生效当月 超出的流量关联到 该合同
clickNum = new BigInteger(costFlow.toString()); clickFlow = new BigInteger(costFlow.toString());
fistRecode = true;
continue; continue;
} else { } else {
//生效当月之前 超出的流量计入当前合同 //生效当月之前 超出的流量计入上一合同
flowHistory.setNextCode(next.getContractCode());//标记用于那份合同的调整流量 flowHistory.setNextCode(next.getContractCode());//标记用于那份合同的调整流量
tkioFlowHistoryRepository.save(flowHistory); tkioFlowHistoryRepository.save(flowHistory);
} }
...@@ -273,7 +311,7 @@ public class TrackingFlowTask { ...@@ -273,7 +311,7 @@ public class TrackingFlowTask {
tkioFlowHistoryRepository.save(flowHistory); tkioFlowHistoryRepository.save(flowHistory);
} }
} }
return clickNum; return clickFlow;
} }
public void task() { public void task() {
...@@ -489,7 +527,7 @@ public class TrackingFlowTask { ...@@ -489,7 +527,7 @@ public class TrackingFlowTask {
List<Contract> contracts = new ArrayList<>(); List<Contract> contracts = new ArrayList<>();
Contract c1 = new Contract(); Contract c1 = new Contract();
c1.setStartDate("2020-10-03"); c1.setStartDate("2020-10-02");
c1.setContractCode("QWWE-20201101-03"); c1.setContractCode("QWWE-20201101-03");
Contract c2 = new Contract(); Contract c2 = new Contract();
c2.setStartDate("2020-11-02"); c2.setStartDate("2020-11-02");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment