Commit c219238a by kangxiaoshan

跑流量

parent 73730be0
......@@ -9,7 +9,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.util.ArrayList;
public interface TkioFlowRepository extends JpaRepository<TkioFlow, Long> {
public interface TkioFlowRepository extends JpaRepository<TkioFlow, Long> {
@Query(value = "select sum(flow) from tkio_flow where email = ?1 and contract_code = ?2", nativeQuery = true)
BigDecimal sumFlowByEmailAndContractCode(String email, String contractCode);
......@@ -24,4 +24,9 @@ public interface TkioFlowRepository extends JpaRepository<TkioFlow, Long> {
@Query(value = "select sum(flow) from tkio_flow where contract_code in ?3 and ds >= ?1 and ds <= ?2", nativeQuery = true)
BigDecimal sumFlowByDsAndContractCodes(String startDate, String endDate, ArrayList<String> contractCode);
@Transactional
@Modifying
@Query(value = "delete from tkio_flow where email = ?1 and ds = ?2", nativeQuery = true)
void deleteByEmailDs(String email, String yesterday);
}
......@@ -27,7 +27,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.util.Enumeration;
import java.util.Map;
import java.util.stream.Collectors;
......@@ -61,8 +60,8 @@ public class AuthorizationInterceptor extends HandlerInterceptorAdapter {
//放开登录
String requestURL = request.getRequestURL().toString();
if(requestURL.contains("findSaleInfo")){
System.out.println("-----"+requestURL.contains("findSaleInfo"));
if (requestURL.contains("findSaleInfo")) {
System.out.println("-----" + requestURL.contains("findSaleInfo"));
return true;
}
// System.out.println("============="+requestURL);
......@@ -72,15 +71,16 @@ public class AuthorizationInterceptor extends HandlerInterceptorAdapter {
|| requestURL.indexOf("user/forget") != -1
|| requestURL.indexOf("user/code") != -1
|| requestURL.indexOf("txt2db") != -1
|| requestURL.indexOf("/test/contract/flow") != -1
|| requestURL.indexOf("contract/build") != -1
|| requestURL.indexOf("accountmng/findSaleInfo") != -1){
|| requestURL.indexOf("accountmng/findSaleInfo") != -1) {
return true;
}
//从header中得到token
String token = request.getHeader(httpHeaderName);
//token = StringUtil.isEmpty(token) ? getCookieToken(request.getCookies(), "TOKEN") : token;
if(null != token){
if (null != token) {
token = token.substring(httpHeaderPrefix.length());
//验证token
String key = manager.getKeyFromToken(token);
......@@ -101,15 +101,15 @@ public class AuthorizationInterceptor extends HandlerInterceptorAdapter {
one.setAuthdataDic(authdata);
}
request.getSession().setAttribute(Constant.CURRENT_ACCOUNT,one);
request.getSession().setAttribute(Constant.CURRENT_ACCOUNT, one);
sessionAct = one;
}
if(!sessionAct.getRole().equals(RoleEnum.MANAGER.getKey())){
if (!sessionAct.getRole().equals(RoleEnum.MANAGER.getKey())) {
//权限控制
AuthKey authKey = handler.getClass().getAnnotation(AuthKey.class);
if(authKey!=null && sessionAct.getAuthdataDic().get(authKey.value().getIdKey())==null){
printJsonResponse(response,"权限不足!");
AuthKey authKey = handler.getClass().getAnnotation(AuthKey.class);
if (authKey != null && sessionAct.getAuthdataDic().get(authKey.value().getIdKey()) == null) {
printJsonResponse(response, "权限不足!");
return false;
}
......@@ -121,31 +121,31 @@ public class AuthorizationInterceptor extends HandlerInterceptorAdapter {
}
printJsonResponse(response,null);
printJsonResponse(response, null);
return false;
}
private void printJsonResponse(HttpServletResponse response,String message){
private void printJsonResponse(HttpServletResponse response, String message) {
try {
//如果验证token失败,并且方法注明了Authorization,返回401错误
response.setStatus(HttpStatus.UNAUTHORIZED.value());
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(response.getOutputStream()));
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
if(StringUtils.isEmpty(message)){
if (StringUtils.isEmpty(message)) {
message = new ObjectMapper().writeValueAsString(ResultModel.ERROR(ResultStatus.USERNAME_LOGIN_EXPIRE));
}
writer.write(message);
writer.close();
} catch (Exception e){
} catch (Exception e) {
e.printStackTrace();
}
}
private String getCookieToken(Cookie[] cookies, String name){
private String getCookieToken(Cookie[] cookies, String name) {
String token = null;
......
package track.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import track.task.TrackingFlowTask;
@RestController
public class TestFlowController {
@Autowired
TrackingFlowTask trackingFlowTask;
@GetMapping("/test/contract/flow")
public void testFlow(String email) {
trackingFlowTask.reset(email);
}
}
......@@ -7,7 +7,9 @@ import common.repository.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StopWatch;
import tkio.model.Account;
import tkio.repository.AccountRepository;
import tkio.repository.AppRepository;
......@@ -21,13 +23,18 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author liyin
* @description
* @date
*/
@Service
public class TrackingFlowTask {
private final Logger logger = LoggerFactory.getLogger(TrackingFlowTask.class);
......@@ -55,7 +62,16 @@ public class TrackingFlowTask {
* 功能描述:定时同步流量(每7分钟执行一次)
*/
public void syncFlow() {
List<CalculationFlow> calculationFlows = calculationFlowRepository.findByStatus(0);
syncFlow(null);
}
public void syncFlow(Long id) {
List<CalculationFlow> calculationFlows;
if (id != null) {
calculationFlows = Arrays.asList(calculationFlowRepository.findOne(id));
} else {
calculationFlows = calculationFlowRepository.findByStatus(0);
}
for (CalculationFlow calculationFlow : calculationFlows) {
List<TkioFlow> tkioFlowList = new ArrayList<>();
calculationFlow.setStatus(1);
......@@ -95,7 +111,7 @@ public class TrackingFlowTask {
for (int ii = between; ii > startInt; ii--) {
String yesterday = DateUtil.getBeforeDays(ii);//昨日
//BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum");
BigInteger clickNum = flowService.getFlowByAccount(yesterday,yesterday,appkeys);
BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys);
if (clickNum != null && clickNum.longValue() > 0) {
List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email);
if (contracts.size() == 1) {//只有一个合同
......@@ -214,8 +230,8 @@ public class TrackingFlowTask {
//查询用户下所有appkey
Account account = accountRepository.findByEmail(email);
if(account==null||account.getRootParent()==null){
logger.warn("【单日流量同步】用户不存在:{}",email);
if (account == null || account.getRootParent() == null) {
logger.warn("【单日流量同步】用户不存在:{}", email);
continue;
}
List<Account> accountList = accountRepository.findByRootParent(account.getRootParent());
......@@ -226,12 +242,12 @@ public class TrackingFlowTask {
List<String> appkeys = appRepository.findAppkeysNotDebug(idList);
String appkeyStr = String.join("','", appkeys);
appkeyStr = "'" + appkeyStr + "'";
if(CollectionUtils.isEmpty(appkeys)){
logger.warn("【单日流量同步】该用户没有appkey:{}",email);
if (CollectionUtils.isEmpty(appkeys)) {
logger.warn("【单日流量同步】该用户没有appkey:{}", email);
continue;
}
//BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum");
BigInteger clickNum = flowService.getFlowByAccount(yesterday,yesterday,appkeys);
BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys);
if (clickNum != null && clickNum.longValue() > 0) {
List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email);
if (contracts.size() == 1) {//只有一个合同
......@@ -391,48 +407,90 @@ public class TrackingFlowTask {
/**
* 功能描述:临时跑一段时间内的流量任务
*/
public void reset(){
public void reset(String lastEmail) {
List<String> emails = contractRepository.findDistinctEmailByPlatform("tkio");
List<String> dateInterval = DateUtil.getDateInterval("2021-05-25", "2021-07-15");
// dateInterval.add("2021-06-30");
// emails = Arrays.asList("1291269883@qq.com");
List<TkioFlow> tkioFlowList = new ArrayList<>();
//String yesterday = DateUtil.getBeforeDays(1);//昨日
List<String> dateInterval = DateUtil.getDateInterval("2021-05-25", "2021-06-06");
for (String yesterday : dateInterval) {
logger.info("昨日流量同步:" + yesterday);
if (lastEmail != null) {
List<String> unSyncEmails = new ArrayList<>();
boolean addflag = false;
for (String email : emails) {
//查询用户下所有appkey
Account account = accountRepository.findByEmail(email);
if(account==null||account.getRootParent()==null){
logger.warn("【单日流量同步】用户不存在:{}",email);
continue;
if (email != null && email.equals(lastEmail)) {
addflag = true;
}
List<Account> accountList = accountRepository.findByRootParent(account.getRootParent());
List<Long> idList = new ArrayList<>();
for (Account ac : accountList) {
idList.add(ac.getId());
if (addflag) {
unSyncEmails.add(email);
}
List<String> appkeys = appRepository.findAppkeysNotDebug(idList);
String appkeyStr = String.join("','", appkeys);
appkeyStr = "'" + appkeyStr + "'";
if(CollectionUtils.isEmpty(appkeys)){
logger.warn("【单日流量同步】该用户没有appkey:{}",email);
continue;
}
if (!unSyncEmails.isEmpty()) {
emails = unSyncEmails;
}
}
for (String email : emails) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
CompletableFuture[] futures = dateInterval.stream().map(dsone ->
CompletableFuture.supplyAsync(() -> {
reset(dsone, email);
return Thread.currentThread().getName();
}
).exceptionally((t) -> {
logger.error("erro on " + email + "-" + dsone, t);
return "-1";
})).toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures);
String names = Stream.of(futures).map(f -> f.join()).map(v -> ((String) v).split("-")[2]).collect(Collectors.joining(",-"));
stopWatch.stop();
logger.info(" {}s, emmial {} ,ForkJoinPool.commonPool-worker-{}", stopWatch.getTotalTimeSeconds(), email, names);
}
logger.info("reset complate ! ...");
}
public void reset(String yesterday, String email) {
//logger.info("ds {} eamil {} running...", yesterday, email);
tkioFlowRepository.deleteByEmailDs(email, yesterday);
//查询用户下所有appkey
Account account = accountRepository.findByEmail(email);
if (account == null || account.getRootParent() == null) {
logger.warn("【单日流量同步】用户不存在:{}", email);
return;
}
List<Account> accountList = accountRepository.findByRootParent(account.getRootParent());
List<Long> idList = new ArrayList<>();
for (Account ac : accountList) {
idList.add(ac.getId());
}
List<String> appkeys = appRepository.findAppkeysNotDebug(idList);
String appkeyStr = String.join("','", appkeys);
appkeyStr = "'" + appkeyStr + "'";
if (CollectionUtils.isEmpty(appkeys)) {
logger.warn("【单日流量同步】该用户没有appkey:{}", email);
return;
}
//BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum");
BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys);
if (clickNum != null && clickNum.longValue() > 0) {
List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email);
if (contracts.size() == 1) {//只有一个合同
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0));
if (tkioFlow != null) {
//tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
//BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum");
BigInteger clickNum = flowService.getFlowByAccount(yesterday,yesterday,appkeys);
if (clickNum != null && clickNum.longValue() > 0) {
List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email);
if (contracts.size() == 1) {//只有一个合同
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0));
if (tkioFlow != null) {
tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
} else {//多个合同
//看昨日被哪几个合同包含了
List<Contract> correlationContract = new ArrayList<>();
for (Contract contract : contracts) {
} else {//多个合同
//看昨日被哪几个合同包含了
List<Contract> correlationContract = new ArrayList<>();
for (Contract contract : contracts) {
/*if (ContractStatusEnum.CANCEL.getKey().equals(contract.getStatus()) || ContractStatusEnum.SUSPEND.getKey().equals(contract.getStatus())) {
//中止或作废合同处理结束时间,以方便昨日流量的归属计算
ContractChange contractChange = contractChangeRepository.findByContentCode(ContractStatusEnum.CANCEL.getValue(), contract.getContractCode());
......@@ -440,92 +498,86 @@ public class TrackingFlowTask {
contract.setEndDate(new DateTime(contractChange.getCreateTime()).toString("yyyy-MM-dd"));
}
}*/
String startDate = contract.getStartDate();
String endDate = contract.getEndDate();
if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) {
correlationContract.add(contract);
String startDate = contract.getStartDate();
String endDate = contract.getEndDate();
if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) {
correlationContract.add(contract);
}
}
//多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下
if (correlationContract.size() > 1) {
//冒泡
//第一优先级:合同开始日期,第二优先级,合同编号大小
Contract[] contractsArray = new Contract[correlationContract.size()];
contractsArray = correlationContract.toArray(contractsArray);
contractsArray = orderByContract(contractsArray);
for (int i = 0; i < contractsArray.length; i++) {
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]);
if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {//处理成本流量,如果超出了,依次算在下一个合同上
clickNum = BigInteger.valueOf(tkioFlow.getCostFlow());
tkioFlow.setCostFlow(null);
if (tkioFlow.getFlow() > 0) {
// tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
} else {
if (tkioFlow != null) {
//tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
break;
}
//多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下
if (correlationContract.size() > 1) {
//冒泡
//第一优先级:合同开始日期,第二优先级,合同编号大小
Contract[] contractsArray = new Contract[correlationContract.size()];
contractsArray = correlationContract.toArray(contractsArray);
contractsArray = orderByContract(contractsArray);
}
} else {
if (correlationContract.size() == 0) {//昨日不包含在所有合同中
//排序
Contract[] contractsArray = new Contract[contracts.size()];
contractsArray = contracts.toArray(contractsArray);
contractsArray = orderByContract(contractsArray);
//如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上
for (int i = 0; i < contractsArray.length; i++) {
if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) {
int j = 0;
for (int i = 0; i < contractsArray.length; i++) {
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]);
if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {//处理成本流量,如果超出了,依次算在下一个合同上
clickNum = BigInteger.valueOf(tkioFlow.getCostFlow());
tkioFlow.setCostFlow(null);
if (tkioFlow.getFlow() > 0) {
tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
} else {
if (tkioFlow != null) {
tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
break;
if (i != 0) {
j = i - 1;
}
}
} else {
if (correlationContract.size() == 0) {//昨日不包含在所有合同中
//排序
Contract[] contractsArray = new Contract[contracts.size()];
contractsArray = contracts.toArray(contractsArray);
contractsArray = orderByContract(contractsArray);
//如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上
for (int i = 0; i < contractsArray.length; i++) {
if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) {
int j = 0;
if (i != 0) {
j = i - 1;
}
if (i == contractsArray.length - 1) {
j = i;
}
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]);
if (tkioFlow != null) {
tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
break;
}
if (i == contractsArray.length - 1) {
j = i;
}
} else {
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0));
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]);
if (tkioFlow != null) {
tkioFlowList.add(tkioFlow);
//tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
break;
}
}
} else {
TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0));
if (tkioFlow != null) {
//tkioFlowList.add(tkioFlow);
tkioFlowRepository.save(tkioFlow);
}
}
}
}
}
}
}
public static void main(String[] args) throws ParseException {
/* public static void main(String[] args) throws ParseException {
String ago = "2020-10-01";
int between = DateUtil.daysBetween(ago, DateUtil.getBeforeDays(1)) + 1;
System.out.println(between);
for (int i = between; i > 0; i--) {
System.out.println(DateUtil.getBeforeDays(i));
}
}
}*/
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment