package track.task; import common.model.CalculationFlow; import common.model.Contract; import common.model.TkioFlow; import common.repository.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.StopWatch; import org.springframework.util.StringUtils; import tkio.model.Account; import tkio.repository.AccountRepository; import tkio.repository.AppRepository; import tkio.service.FlowService; import common.model.RestTaskInfo; import common.repository.RestTaskInfoRepository; import util.Constant; import util.DateUtil; import util.StringUtil; import java.math.BigDecimal; import java.math.BigInteger; import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Stream; /** * @author liyin * @description * @date */ @Service public class TrackingFlowTask { private final Logger logger = LoggerFactory.getLogger(TrackingFlowTask.class); @Autowired private ContractRepository contractRepository; @Autowired private AccountRepository accountRepository; @Autowired private AppRepository appRepository; /*@Autowired private AccountFlowRestrictService accountFlowRestrictService;*/ @Autowired private FlowService flowService; @Autowired private TkioFlowRepository tkioFlowRepository; @Autowired private ContractChangeRepository contractChangeRepository; @Autowired private CalculationFlowRepository calculationFlowRepository; @Autowired private TkioFlowHistoryRepository tkioFlowHistoryRepository; @Autowired private RestTaskInfoRepository restTaskInfoRepository; /** * 功能描述:定时同步流量(每7分钟执行一次) */ public void syncFlow() { syncFlow(null); } public void syncFlow(Long id) { List<CalculationFlow> calculationFlows; if (id != null) { calculationFlows = Arrays.asList(calculationFlowRepository.findOne(id)); } else { calculationFlows = calculationFlowRepository.findByStatus(0); } for (CalculationFlow calculationFlow : calculationFlows) { List<TkioFlow> tkioFlowList = new ArrayList<>(); calculationFlow.setStatus(1); calculationFlowRepository.save(calculationFlow); tkioFlowRepository.deleteByEmail(calculationFlow.getEmail()); String email = calculationFlow.getEmail(); //查询用户下所有appkey try { Account account = accountRepository.findByEmail(email); List<Account> accountList = accountRepository.findByRootParent(account.getRootParent()); List<Long> idList = new ArrayList<>(); for (Account ac : accountList) { idList.add(ac.getId()); } List<String> appkeys = appRepository.findAppkeysNotDebug(idList); String appkeyStr = String.join("','", appkeys); appkeyStr = "'" + appkeyStr + "'"; String ago = DateUtil.format(account.getCreateTime(), DateUtil.C_DATE_PATTON_DEFAULT);//查找最早一天的流量 if (ago == null) { calculationFlow.setStatus(2); calculationFlowRepository.save(calculationFlow); continue; } int between = 0; try { between = DateUtil.daysBetween(ago, DateUtil.getBeforeDays(1)) + 1; } catch (ParseException e) { logger.error("强转错误:", e); } int startInt = 0; if (Integer.valueOf(DateUtil.getHH()) > 10) { startInt = -1; } for (int ii = between; ii > startInt; ii--) { String yesterday = DateUtil.getBeforeDays(ii);//昨日 //BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum"); BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys); if (clickNum != null && clickNum.longValue() > 0) { List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email); if (contracts.size() == 1) {//只有一个合同 TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0)); if (tkioFlow != null) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } } else { //多个合同 //看昨日被哪几个合同包含了 List<Contract> correlationContract = new ArrayList<>(); for (Contract contract : contracts) { /*if (ContractStatusEnum.CANCEL.getKey().equals(contract.getStatus()) || ContractStatusEnum.SUSPEND.getKey().equals(contract.getStatus())) { //中止或作废合同处理结束时间,以方便昨日流量的归属计算 ContractChange contractChange = contractChangeRepository.findByContentCode(ContractStatusEnum.CANCEL.getValue(), contract.getContractCode()); if (contractChange != null) { contract.setEndDate(new DateTime(contractChange.getCreateTime()).toString("yyyy-MM-dd")); } }*/ String startDate = contract.getStartDate(); String endDate = contract.getEndDate(); if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) { correlationContract.add(contract); } } //多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下 if (correlationContract.size() > 1) { //冒泡 //第一优先级:合同开始日期,第二优先级,合同编号大小 Contract[] contractsArray = new Contract[correlationContract.size()]; contractsArray = correlationContract.toArray(contractsArray); contractsArray = orderByContract(contractsArray); for (int i = 0; i < contractsArray.length; i++) { TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]); if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {//处理成本流量,如果超出了,依次算在下一个合同上 clickNum = BigInteger.valueOf(tkioFlow.getCostFlow()); tkioFlow.setCostFlow(null); if (tkioFlow.getFlow() > 0) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } } else { if (tkioFlow != null) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } break; } } } else { if (correlationContract.size() == 0) {//昨日不包含在所有合同中 //排序 Contract[] contractsArray = new Contract[contracts.size()]; contractsArray = contracts.toArray(contractsArray); contractsArray = orderByContract(contractsArray); //如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上 for (int i = 0; i < contractsArray.length; i++) { if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) { int j = 0; if (i != 0) { j = i - 1; } if (i == contractsArray.length - 1) { j = i; } TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]); if (tkioFlow != null) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } break; } } } else { TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0)); if (tkioFlow != null) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } } } } } } if (tkioFlowList.size() > 0) { //tkioFlowRepository.save(tkioFlowList); } calculationFlow.setStatus(2); calculationFlowRepository.save(calculationFlow); } catch (Exception e) { logger.error("CalculationFlow:Id::" + calculationFlow.getId() + ":全流量同步失败", e); calculationFlow.setStatus(3); calculationFlowRepository.save(calculationFlow); } } } /** * 功能描述:定时同步昨日流量(每天十点) */ public void task() { List<String> emails = contractRepository.findDistinctEmailByPlatform("tkio"); List<TkioFlow> tkioFlowList = new ArrayList<>(); String yesterday = DateUtil.getBeforeDays(1);//昨日 logger.info("昨日流量同步:" + yesterday); for (String email : emails) { //查询用户下所有appkey Account account = accountRepository.findByEmail(email); if (account == null || account.getRootParent() == null) { logger.warn("【单日流量同步】用户不存在:{}", email); continue; } List<Account> accountList = accountRepository.findByRootParent(account.getRootParent()); List<Long> idList = new ArrayList<>(); for (Account ac : accountList) { idList.add(ac.getId()); } List<String> appkeys = appRepository.findAppkeysNotDebug(idList); String appkeyStr = String.join("','", appkeys); appkeyStr = "'" + appkeyStr + "'"; if (CollectionUtils.isEmpty(appkeys)) { logger.warn("【单日流量同步】该用户没有appkey:{}", email); continue; } //BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum"); BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys); if (clickNum != null && clickNum.longValue() > 0) { List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email); if (contracts.size() == 1) {//只有一个合同 TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0)); if (tkioFlow != null) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } } else {//多个合同 //看昨日被哪几个合同包含了 List<Contract> correlationContract = new ArrayList<>(); for (Contract contract : contracts) { /*if (ContractStatusEnum.CANCEL.getKey().equals(contract.getStatus()) || ContractStatusEnum.SUSPEND.getKey().equals(contract.getStatus())) { //中止或作废合同处理结束时间,以方便昨日流量的归属计算 ContractChange contractChange = contractChangeRepository.findByContentCode(ContractStatusEnum.CANCEL.getValue(), contract.getContractCode()); if(contractChange!=null){ contract.setEndDate(new DateTime(contractChange.getCreateTime()).toString("yyyy-MM-dd")); } }*/ String startDate = contract.getStartDate(); String endDate = contract.getEndDate(); if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) { correlationContract.add(contract); } } //多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下 if (correlationContract.size() > 1) { //冒泡 //第一优先级:合同开始日期,第二优先级,合同编号大小 Contract[] contractsArray = new Contract[correlationContract.size()]; contractsArray = correlationContract.toArray(contractsArray); contractsArray = orderByContract(contractsArray); for (int i = 0; i < contractsArray.length; i++) { TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]); if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {//处理成本流量,如果超出了,依次算在下一个合同上 clickNum = BigInteger.valueOf(tkioFlow.getCostFlow()); tkioFlow.setCostFlow(null); if (tkioFlow.getFlow() > 0) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } } else { if (tkioFlow != null) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } break; } } } else { if (correlationContract.size() == 0) {//昨日不包含在所有合同中 //排序 Contract[] contractsArray = new Contract[contracts.size()]; contractsArray = contracts.toArray(contractsArray); contractsArray = orderByContract(contractsArray); //如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上 for (int i = 0; i < contractsArray.length; i++) { if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) { int j = 0; if (i != 0) { j = i - 1; } if (i == contractsArray.length - 1) { j = i; } TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]); if (tkioFlow != null) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } break; } } } else { TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0)); if (tkioFlow != null) { tkioFlowList.add(tkioFlow); tkioFlowRepository.save(tkioFlow); } } } } } } if (tkioFlowList.size() > 0) { //tkioFlowRepository.save(tkioFlowList); } } //排序 public Contract[] orderByContract(Contract[] contractsArray) { for (int i = 0; i < contractsArray.length - 1; i++) { for (int j = 0; j < contractsArray.length - 1 - i; j++) { String startDate1 = contractsArray[j].getSignedDate() == null ? contractsArray[j].getStartDate() : contractsArray[j].getSignedDate(); String startDate2 = contractsArray[j + 1].getSignedDate() == null ? contractsArray[j + 1].getStartDate() : contractsArray[j + 1].getSignedDate(); if (DateUtil.getDate(startDate1).getTime() > DateUtil.getDate(startDate2).getTime()) { Contract temp = contractsArray[j]; contractsArray[j] = contractsArray[j + 1]; contractsArray[j + 1] = temp; } else if (DateUtil.getDate(startDate1).getTime() == DateUtil.getDate(startDate2).getTime()) { String contractCode = StringUtil.matchNumber(contractsArray[j].getContractCode()); String contractCodeMin = StringUtil.matchNumber(contractsArray[j + 1].getContractCode()); if (Long.valueOf(contractCode) > Long.valueOf(contractCodeMin)) { Contract temp = contractsArray[j]; contractsArray[j] = contractsArray[j + 1]; contractsArray[j + 1] = temp; } } } } return contractsArray; } public TkioFlow getTkioFlow(BigInteger clickNum, String yesterday, Contract contract) { TkioFlow tkioFlow = new TkioFlow(); tkioFlow.setCreateTime(DateUtil.now()); tkioFlow.setDs(yesterday); tkioFlow.setEmail(contract.getEmail()); tkioFlow.setContractCode(contract.getContractCode()); try { if (DateUtil.daysBetween(contract.getStartDate(), yesterday) < 0 || DateUtil.daysBetween(contract.getEndDate(), yesterday) > 0) {//昨日日期早于合同开始日期 //设置为成本流量 tkioFlow.setCostFlow(clickNum.longValue()); } else { //查看历史总消耗流量是否超出 BigDecimal totalFlow = tkioFlowRepository.sumFlowByEmailAndContractCode(contract.getEmail(), contract.getContractCode()); totalFlow = totalFlow == null ? new BigDecimal(0) : totalFlow; Double contractTrackFlow = contract.getTrackFlow() * 10000; if (contractTrackFlow.longValue() - totalFlow.longValue() - clickNum.longValue() >= 0 || contract.getPriceLevel() == Constant.tkioPriceLevelNotLimit) { tkioFlow.setFlow(clickNum.longValue()); } else { tkioFlow.setFlow(contractTrackFlow.longValue() - totalFlow.longValue()); tkioFlow.setCostFlow(clickNum.longValue() - tkioFlow.getFlow()); } } return tkioFlow; } catch (ParseException e) { logger.error("合同编号-" + contract.getContractCode() + "-同步昨日流量错误:", e); } return null; } /** * 功能描述:临时跑一段时间内的流量任务 */ public void reset() { StopWatch all = new StopWatch(); all.start(); List<RestTaskInfo> taskInfos = restTaskInfoRepository.findByStatus("init"); if (taskInfos == null || taskInfos.size() > 1 || taskInfos.isEmpty()) { logger.info("重置流量配置错误"); return; } RestTaskInfo info = taskInfos.get(0); if (!"init".equals(info.getStatus())) { logger.info("重置流量配置错误"); return; } if (info.getPoolSize() <= 0 || info.getPoolSize() > 100) { info.setPoolSize(30); } info.setStatus("tasking"); restTaskInfoRepository.save(info); ExecutorService executorService = Executors.newFixedThreadPool(info.getPoolSize()); List<String> emails; if (StringUtils.isEmpty(info.getAccountEmails())) { emails = contractRepository.findDistinctEmailByPlatform("tkio"); } else { emails = Arrays.asList(info.getAccountEmails().split(",")); } List<String> dateInterval = DateUtil.getDateInterval(info.getStartDs(), info.getEndDs()); for (String dsone : dateInterval) { StopWatch stopWatch = new StopWatch(); stopWatch.start(); String status = restTaskInfoRepository.findStatus(info.getId()); if (!status.equals("tasking")) { logger.info("暂停重置流量 ......"); break; } CompletableFuture[] futures = emails.stream().map(em -> CompletableFuture.runAsync(() -> { reset(dsone, em); }, executorService).exceptionally((t) -> { logger.error("重置流量错误 " + em + "-" + dsone, t); return null; })).toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).join(); stopWatch.stop(); logger.info("重置流量 {} accoount, {}s, ds {} ", emails.size(), stopWatch.getTotalTimeSeconds(), dsone); } logger.info("重置流量完成 ! ..."); info.setStatus("complate"); all.stop(); info.setAllSeconds(all.getTotalTimeSeconds()); restTaskInfoRepository.save(info); executorService.shutdown(); } public void reset(String yesterday, String email) { if(StringUtil.isEmpty(email)){ return; } //删除历史数据 List<Long> delIdList = tkioFlowRepository.findIdByEmailDs(email, yesterday); if (delIdList != null && !delIdList.isEmpty()) { tkioFlowRepository.deleteByIds(delIdList); } //查询用户下所有appkey Account account = accountRepository.findByEmail(email); if (account == null || account.getRootParent() == null) { return; } List<Account> accountList = accountRepository.findByRootParent(account.getRootParent()); List<Long> idList = new ArrayList<>(); for (Account ac : accountList) { idList.add(ac.getId()); } List<String> appkeys = appRepository.findAppkeysNotDebug(idList); if (CollectionUtils.isEmpty(appkeys)) { return; } BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys); if (clickNum != null && clickNum.longValue() > 0) { List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email); if (contracts.size() == 1) {//只有一个合同 TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0)); if (tkioFlow != null) { saveTkioFlowItem(tkioFlow); } } else {//多个合同 //看昨日被哪几个合同包含了 List<Contract> correlationContract = new ArrayList<>(); for (Contract contract : contracts) { String startDate = contract.getStartDate(); String endDate = contract.getEndDate(); if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) { correlationContract.add(contract); } } //多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下 if (correlationContract.size() > 1) { //冒泡 //第一优先级:合同开始日期,第二优先级,合同编号大小 Contract[] contractsArray = new Contract[correlationContract.size()]; contractsArray = correlationContract.toArray(contractsArray); contractsArray = orderByContract(contractsArray); for (int i = 0; i < contractsArray.length; i++) { TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]); if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {//处理成本流量,如果超出了,依次算在下一个合同上 clickNum = BigInteger.valueOf(tkioFlow.getCostFlow()); tkioFlow.setCostFlow(null); if (tkioFlow.getFlow() > 0) { saveTkioFlowItem(tkioFlow); } } else { if (tkioFlow != null) { saveTkioFlowItem(tkioFlow); } break; } } } else { if (correlationContract.size() == 0) {//昨日不包含在所有合同中 //排序 Contract[] contractsArray = new Contract[contracts.size()]; contractsArray = contracts.toArray(contractsArray); contractsArray = orderByContract(contractsArray); //如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上 for (int i = 0; i < contractsArray.length; i++) { if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) { int j = 0; if (i != 0) { j = i - 1; } if (i == contractsArray.length - 1) { j = i; } TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]); if (tkioFlow != null) { saveTkioFlowItem(tkioFlow); } break; } } } else { TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0)); if (tkioFlow != null) { saveTkioFlowItem(tkioFlow); } } } } } } private void saveTkioFlowItem(TkioFlow tkioFlow) { tkioFlowRepository.save(tkioFlow); } }