Commit e995cc03 by kangxiaoshan

优化重置流量

parent f70ca17f
......@@ -67,9 +67,6 @@ public class Contract {
private Integer shareSign;//标记是否计算分摊收入 1 计算 0 不计算
private String searchName;
private List<String> customBodyNames;
......@@ -108,6 +105,9 @@ public class Contract {
private Double oneTimeRecognizedRevenue;//一次性确认收入
private Boolean oneTime;//是否计算一次性收入,0不计算
private int businessType; //业务类型 1 VIP 2 共管
private int agreementType; //协议类型 1 普通协议 2 框架协议
@Id
@GeneratedValue
public Long getId() {
......@@ -589,6 +589,7 @@ public class Contract {
public void setTrackFlow(Double trackFlow) {
this.trackFlow = trackFlow;
}
@Transient
public Double getUnitPrice() {
return unitPrice;
......@@ -648,6 +649,7 @@ public class Contract {
public void setSignedDate(String signedDate) {
this.signedDate = signedDate;
}
@Transient
public double getHistoryTkioFlow() {
return historyTkioFlow;
......@@ -773,4 +775,20 @@ public class Contract {
", validEndDate='" + validEndDate + '\'' +
'}';
}
public int getBusinessType() {
return businessType;
}
public void setBusinessType(int businessType) {
this.businessType = businessType;
}
public int getAgreementType() {
return agreementType;
}
public void setAgreementType(int agreementType) {
this.agreementType = agreementType;
}
}
package common.model;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
@Entity
public class RestTaskInfo {
//重置流量信息类
private Long id;
private String startDs;
private String endDs;
private String status;
private int poolSize;
private Double allSeconds;
private String accountEmails;
@Id
@GeneratedValue
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getStartDs() {
return startDs;
}
public void setStartDs(String startDs) {
this.startDs = startDs;
}
public String getEndDs() {
return endDs;
}
public void setEndDs(String endDs) {
this.endDs = endDs;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public int getPoolSize() {
return poolSize;
}
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}
public Double getAllSeconds() {
return allSeconds;
}
public void setAllSeconds(Double allSeconds) {
this.allSeconds = allSeconds;
}
public String getAccountEmails() {
return accountEmails;
}
public void setAccountEmails(String accountEmails) {
this.accountEmails = accountEmails;
}
}
package common.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import common.model.RestTaskInfo;
import java.util.List;
public interface RestTaskInfoRepository extends JpaRepository<RestTaskInfo, Long> {
@Query(value = "select * from rest_task_info where status = ?1", nativeQuery = true)
List<RestTaskInfo> findByStatus(String init);
@Query(value = "select status from rest_task_info where id = ?1", nativeQuery = true)
String findStatus(Long id);
}
......@@ -27,6 +27,6 @@ public interface TkioFlowRepository extends JpaRepository<TkioFlow, Long> {
@Transactional
@Modifying
@Query(value = "delete from tkio_flow where email = ?1 and ds = ?2", nativeQuery = true)
@Query(value = "delete from tkio_flow where contract_code in ( select contract_code from tkio_flow where email=?1 ) and ds = ?2", nativeQuery = true)
void deleteByEmailDs(String email, String yesterday);
}
......@@ -10,12 +10,11 @@ import track.task.TrackingFlowTask;
@RestController
public class TestFlowController {
@Autowired
TrackingFlowTask trackingFlowTask;
@GetMapping("/test/contract/flow")
public void testFlow(String email) {
trackingFlowTask.reset(email);
public void testFlow() {
trackingFlowTask.reset();
}
}
......@@ -10,11 +10,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StopWatch;
import org.springframework.util.StringUtils;
import tkio.model.Account;
import tkio.repository.AccountRepository;
import tkio.repository.AppRepository;
import tkio.service.AccountFlowRestrictService;
import tkio.service.FlowService;
import common.model.RestTaskInfo;
import common.repository.RestTaskInfoRepository;
import util.Constant;
import util.DateUtil;
import util.StringUtil;
......@@ -26,10 +28,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
......@@ -61,6 +61,9 @@ public class TrackingFlowTask {
@Autowired
private TkioFlowHistoryRepository tkioFlowHistoryRepository;
@Autowired
private RestTaskInfoRepository restTaskInfoRepository;
/**
* 功能描述:定时同步流量(每7分钟执行一次)
*/
......@@ -410,45 +413,74 @@ public class TrackingFlowTask {
/**
* 功能描述:临时跑一段时间内的流量任务
*/
public void reset(String lastEmail) {
ExecutorService executorService = Executors.newFixedThreadPool(30);
public void reset() {
List<String> emails = contractRepository.findDistinctEmailByPlatform("tkio");
// emails = emails.stream().limit(40).collect(Collectors.toList());
List<String> dateInterval = DateUtil.getDateInterval("2021-05-25", "2021-07-15");
// dateInterval.add("2021-06-30");
// emails = Arrays.asList("1291269883@qq.com");
StopWatch all = new StopWatch();
all.start();
List<RestTaskInfo> taskInfos = restTaskInfoRepository.findByStatus("init");
if (taskInfos == null || taskInfos.size() > 1 || taskInfos.isEmpty()) {
logger.info("重置流量配置错误");
return;
}
RestTaskInfo info = taskInfos.get(0);
if (!"init".equals(info.getStatus())) {
logger.info("重置流量配置错误");
return;
}
if (info.getPoolSize() <= 0 || info.getPoolSize() > 100) {
info.setPoolSize(30);
}
ExecutorService executorService = Executors.newFixedThreadPool(info.getPoolSize());
List<String> emails;
if (StringUtils.isEmpty(info.getAccountEmails())) {
emails = contractRepository.findDistinctEmailByPlatform("tkio");
} else {
emails = Arrays.asList(info.getAccountEmails().split(","));
}
List<String> dateInterval = DateUtil.getDateInterval(info.getStartDs(), info.getEndDs());
for (String dsone : dateInterval) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String status = restTaskInfoRepository.findStatus(info.getId());
if (status.equals("init")) {
logger.info("暂停重置流量 ......");
break;
}
CompletableFuture[] futures = emails.stream().map(em ->
CompletableFuture.supplyAsync(() -> {
reset(dsone, em);
return Thread.currentThread().getName() + em;
}, executorService).exceptionally((t) -> {
logger.error("erro on " + em + "-" + dsone, t);
logger.error("重置流量错误 " + em + "-" + dsone, t);
return "-1";
})).toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures);
long names = Stream.of(futures).map(f -> f.join()).map(v -> (String) v).count();
stopWatch.stop();
logger.info(" {}s, ds {} ,ForkJoinPool.commonPool-worker-{}", stopWatch.getTotalTimeSeconds(), dsone, names);
logger.info("重置流量 {} accoount, {}s, ds {} ", names, stopWatch.getTotalTimeSeconds(), dsone);
}
logger.info("reset complate ! ...");
logger.info("重置流量完成 ! ...");
info.setStartDs("complate");
all.stop();
info.setAllSeconds(all.getTotalTimeSeconds());
restTaskInfoRepository.save(info);
executorService.shutdown();
}
public void reset(String yesterday, String email) {
logger.info("ds {} eamil {} running...", yesterday, email);
//tkioFlowRepository.deleteByEmailDs(email, yesterday);
//删除历史数据
tkioFlowRepository.deleteByEmailDs(email, yesterday);
//查询用户下所有appkey
Account account = accountRepository.findByEmail(email);
if (account == null || account.getRootParent() == null) {
logger.warn("【单日流量同步】用户不存在:{}", email);
return;
}
List<Account> accountList = accountRepository.findByRootParent(account.getRootParent());
......@@ -457,33 +489,21 @@ public class TrackingFlowTask {
idList.add(ac.getId());
}
List<String> appkeys = appRepository.findAppkeysNotDebug(idList);
String appkeyStr = String.join("','", appkeys);
appkeyStr = "'" + appkeyStr + "'";
if (CollectionUtils.isEmpty(appkeys)) {
logger.warn("【单日流量同步】该用户没有appkey:{}", email);
return;
}
//BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum");
BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys);
if (clickNum != null && clickNum.longValue() > 0) {
List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email);
if (contracts.size() == 1) {//只有一个合同
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0));
if (tkioFlow != null) {
//tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
saveTkioFlowItem(tkioFlow);
}
} else {//多个合同
//看昨日被哪几个合同包含了
List<Contract> correlationContract = new ArrayList<>();
for (Contract contract : contracts) {
/*if (ContractStatusEnum.CANCEL.getKey().equals(contract.getStatus()) || ContractStatusEnum.SUSPEND.getKey().equals(contract.getStatus())) {
//中止或作废合同处理结束时间,以方便昨日流量的归属计算
ContractChange contractChange = contractChangeRepository.findByContentCode(ContractStatusEnum.CANCEL.getValue(), contract.getContractCode());
if(contractChange!=null){
contract.setEndDate(new DateTime(contractChange.getCreateTime()).toString("yyyy-MM-dd"));
}
}*/
String startDate = contract.getStartDate();
String endDate = contract.getEndDate();
if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) {
......@@ -503,13 +523,11 @@ public class TrackingFlowTask {
clickNum = BigInteger.valueOf(tkioFlow.getCostFlow());
tkioFlow.setCostFlow(null);
if (tkioFlow.getFlow() > 0) {
// tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
saveTkioFlowItem(tkioFlow);
}
} else {
if (tkioFlow != null) {
//tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
saveTkioFlowItem(tkioFlow);
}
break;
}
......@@ -536,19 +554,16 @@ public class TrackingFlowTask {
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]);
if (tkioFlow != null) {
//tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
saveTkioFlowItem(tkioFlow);
}
break;
}
}
} else {
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0));
if (tkioFlow != null) {
//tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
saveTkioFlowItem(tkioFlow);
}
}
......@@ -558,12 +573,8 @@ public class TrackingFlowTask {
}
/* public static void main(String[] args) throws ParseException {
String ago = "2020-10-01";
int between = DateUtil.daysBetween(ago, DateUtil.getBeforeDays(1)) + 1;
System.out.println(between);
for (int i = between; i > 0; i--) {
System.out.println(DateUtil.getBeforeDays(i));
}
}*/
private void saveTkioFlowItem(TkioFlow tkioFlow) {
tkioFlowRepository.save(tkioFlow);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment