TrackingFlowTask.java 28.9 KB
Newer Older
lzxry committed
1 2 3 4 5
package track.task;

import common.model.CalculationFlow;
import common.model.Contract;
import common.model.TkioFlow;
kangxiaoshan committed
6
import common.repository.*;
lzxry committed
7 8 9
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
kangxiaoshan committed
10
import org.springframework.stereotype.Service;
lzxry committed
11
import org.springframework.util.CollectionUtils;
kangxiaoshan committed
12
import org.springframework.util.StopWatch;
kangxiaoshan committed
13
import org.springframework.util.StringUtils;
lzxry committed
14 15 16
import tkio.model.Account;
import tkio.repository.AccountRepository;
import tkio.repository.AppRepository;
17
import tkio.service.FlowService;
kangxiaoshan committed
18 19
import common.model.RestTaskInfo;
import common.repository.RestTaskInfoRepository;
20
import util.Constant;
lzxry committed
21
import util.DateUtil;
22
import util.StringUtil;
lzxry committed
23

lzxry committed
24
import java.math.BigDecimal;
lzxry committed
25 26 27
import java.math.BigInteger;
import java.text.ParseException;
import java.util.ArrayList;
kangxiaoshan committed
28
import java.util.Arrays;
lzxry committed
29
import java.util.List;
kangxiaoshan committed
30
import java.util.concurrent.CompletableFuture;
kangxiaoshan committed
31 32
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
kangxiaoshan committed
33
import java.util.stream.Stream;
lzxry committed
34 35 36 37 38 39

/**
 * @author liyin
 * @description
 * @date
 */
kangxiaoshan committed
40
@Service
lzxry committed
41 42 43 44 45 46 47 48 49 50
public class TrackingFlowTask {

    private final Logger logger = LoggerFactory.getLogger(TrackingFlowTask.class);

    @Autowired
    private ContractRepository contractRepository;
    @Autowired
    private AccountRepository accountRepository;
    @Autowired
    private AppRepository appRepository;
51 52
    /*@Autowired
    private AccountFlowRestrictService accountFlowRestrictService;*/
lzxry committed
53
    @Autowired
54
    private FlowService flowService;
lzxry committed
55 56 57 58 59 60
    @Autowired
    private TkioFlowRepository tkioFlowRepository;
    @Autowired
    private ContractChangeRepository contractChangeRepository;
    @Autowired
    private CalculationFlowRepository calculationFlowRepository;
kangxiaoshan committed
61 62
    @Autowired
    private TkioFlowHistoryRepository tkioFlowHistoryRepository;
lzxry committed
63

kangxiaoshan committed
64 65 66
    @Autowired
    private RestTaskInfoRepository restTaskInfoRepository;

67 68 69
    /**
     * 功能描述:定时同步流量(每7分钟执行一次)
     */
70
    public void syncFlow() {
kangxiaoshan committed
71 72 73 74 75 76 77 78 79 80
        syncFlow(null);
    }

    public void syncFlow(Long id) {
        List<CalculationFlow> calculationFlows;
        if (id != null) {
            calculationFlows = Arrays.asList(calculationFlowRepository.findOne(id));
        } else {
            calculationFlows = calculationFlowRepository.findByStatus(0);
        }
lzxry committed
81 82 83 84 85 86
        for (CalculationFlow calculationFlow : calculationFlows) {
            List<TkioFlow> tkioFlowList = new ArrayList<>();
            calculationFlow.setStatus(1);
            calculationFlowRepository.save(calculationFlow);
            tkioFlowRepository.deleteByEmail(calculationFlow.getEmail());
            String email = calculationFlow.getEmail();
87

lzxry committed
88
            //查询用户下所有appkey
89
            try {
lzxry committed
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
                Account account = accountRepository.findByEmail(email);
                List<Account> accountList = accountRepository.findByRootParent(account.getRootParent());
                List<Long> idList = new ArrayList<>();
                for (Account ac : accountList) {
                    idList.add(ac.getId());
                }
                List<String> appkeys = appRepository.findAppkeysNotDebug(idList);
                String appkeyStr = String.join("','", appkeys);
                appkeyStr = "'" + appkeyStr + "'";

                String ago = DateUtil.format(account.getCreateTime(), DateUtil.C_DATE_PATTON_DEFAULT);//查找最早一天的流量
                if (ago == null) {
                    calculationFlow.setStatus(2);
                    calculationFlowRepository.save(calculationFlow);
                    continue;
                }
lzxry committed
106

lzxry committed
107 108 109 110 111 112 113 114 115 116 117 118
                int between = 0;
                try {
                    between = DateUtil.daysBetween(ago, DateUtil.getBeforeDays(1)) + 1;
                } catch (ParseException e) {
                    logger.error("强转错误:", e);
                }
                int startInt = 0;
                if (Integer.valueOf(DateUtil.getHH()) > 10) {
                    startInt = -1;
                }
                for (int ii = between; ii > startInt; ii--) {
                    String yesterday = DateUtil.getBeforeDays(ii);//昨日
119
                    //BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum");
kangxiaoshan committed
120
                    BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys);
lzxry committed
121 122 123 124 125 126
                    if (clickNum != null && clickNum.longValue() > 0) {
                        List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email);
                        if (contracts.size() == 1) {//只有一个合同
                            TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0));
                            if (tkioFlow != null) {
                                tkioFlowList.add(tkioFlow);
lzxry committed
127
                                tkioFlowRepository.save(tkioFlow);
lzxry committed
128
                            }
lzxry committed
129 130 131 132 133 134 135 136 137 138
                        } else {
                            //多个合同
                            //看昨日被哪几个合同包含了
                            List<Contract> correlationContract = new ArrayList<>();
                            for (Contract contract : contracts) {
                                /*if (ContractStatusEnum.CANCEL.getKey().equals(contract.getStatus()) || ContractStatusEnum.SUSPEND.getKey().equals(contract.getStatus())) {
                                    //中止或作废合同处理结束时间,以方便昨日流量的归属计算
                                    ContractChange contractChange = contractChangeRepository.findByContentCode(ContractStatusEnum.CANCEL.getValue(), contract.getContractCode());
                                    if (contractChange != null) {
                                        contract.setEndDate(new DateTime(contractChange.getCreateTime()).toString("yyyy-MM-dd"));
lzxry committed
139
                                    }
lzxry committed
140 141 142 143 144
                                }*/
                                String startDate = contract.getStartDate();
                                String endDate = contract.getEndDate();
                                if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) {
                                    correlationContract.add(contract);
lzxry committed
145 146
                                }
                            }
lzxry committed
147 148 149 150 151 152
                            //多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下
                            if (correlationContract.size() > 1) {
                                //冒泡
                                //第一优先级:合同开始日期,第二优先级,合同编号大小
                                Contract[] contractsArray = new Contract[correlationContract.size()];
                                contractsArray = correlationContract.toArray(contractsArray);
lzxry committed
153

lzxry committed
154
                                contractsArray = orderByContract(contractsArray);
lzxry committed
155

lzxry committed
156 157
                                for (int i = 0; i < contractsArray.length; i++) {
                                    TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]);
158
                                    if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {//处理成本流量,如果超出了,依次算在下一个合同上
lzxry committed
159 160
                                        clickNum = BigInteger.valueOf(tkioFlow.getCostFlow());
                                        tkioFlow.setCostFlow(null);
161
                                        if (tkioFlow.getFlow() > 0) {
lzxry committed
162 163 164
                                            tkioFlowList.add(tkioFlow);
                                            tkioFlowRepository.save(tkioFlow);
                                        }
lzxry committed
165 166
                                    } else {
                                        if (tkioFlow != null) {
lzxry committed
167
                                            tkioFlowList.add(tkioFlow);
lzxry committed
168
                                            tkioFlowRepository.save(tkioFlow);
lzxry committed
169 170 171
                                        }
                                        break;
                                    }
lzxry committed
172

lzxry committed
173
                                }
lzxry committed
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
                            } else {
                                if (correlationContract.size() == 0) {//昨日不包含在所有合同中
                                    //排序
                                    Contract[] contractsArray = new Contract[contracts.size()];
                                    contractsArray = contracts.toArray(contractsArray);
                                    contractsArray = orderByContract(contractsArray);
                                    //如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上
                                    for (int i = 0; i < contractsArray.length; i++) {
                                        if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) {
                                            int j = 0;

                                            if (i != 0) {
                                                j = i - 1;
                                            }

                                            if (i == contractsArray.length - 1) {
                                                j = i;
                                            }

                                            TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]);
                                            if (tkioFlow != null) {
                                                tkioFlowList.add(tkioFlow);
lzxry committed
196
                                                tkioFlowRepository.save(tkioFlow);
lzxry committed
197 198 199 200 201 202 203 204
                                            }
                                            break;
                                        }
                                    }
                                } else {
                                    TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0));
                                    if (tkioFlow != null) {
                                        tkioFlowList.add(tkioFlow);
lzxry committed
205
                                        tkioFlowRepository.save(tkioFlow);
lzxry committed
206
                                    }
lzxry committed
207 208
                                }

lzxry committed
209
                            }
lzxry committed
210 211 212
                        }
                    }
                }
lzxry committed
213
                if (tkioFlowList.size() > 0) {
lzxry committed
214
                    //tkioFlowRepository.save(tkioFlowList);
lzxry committed
215 216 217
                }
                calculationFlow.setStatus(2);
                calculationFlowRepository.save(calculationFlow);
218 219
            } catch (Exception e) {
                logger.error("CalculationFlow:Id::" + calculationFlow.getId() + ":全流量同步失败", e);
lzxry committed
220 221
                calculationFlow.setStatus(3);
                calculationFlowRepository.save(calculationFlow);
lzxry committed
222 223 224 225
            }
        }
    }

226 227 228
    /**
     * 功能描述:定时同步昨日流量(每天十点)
     */
229
    public void task() {
lzxry committed
230 231 232
        List<String> emails = contractRepository.findDistinctEmailByPlatform("tkio");

        List<TkioFlow> tkioFlowList = new ArrayList<>();
lzxry committed
233
        String yesterday = DateUtil.getBeforeDays(1);//昨日
234
        logger.info("昨日流量同步:" + yesterday);
lzxry committed
235 236 237 238
        for (String email : emails) {

            //查询用户下所有appkey
            Account account = accountRepository.findByEmail(email);
kangxiaoshan committed
239 240
            if (account == null || account.getRootParent() == null) {
                logger.warn("【单日流量同步】用户不存在:{}", email);
lzxry committed
241
                continue;
242
            }
lzxry committed
243 244 245 246 247 248 249
            List<Account> accountList = accountRepository.findByRootParent(account.getRootParent());
            List<Long> idList = new ArrayList<>();
            for (Account ac : accountList) {
                idList.add(ac.getId());
            }
            List<String> appkeys = appRepository.findAppkeysNotDebug(idList);
            String appkeyStr = String.join("','", appkeys);
250
            appkeyStr = "'" + appkeyStr + "'";
kangxiaoshan committed
251 252
            if (CollectionUtils.isEmpty(appkeys)) {
                logger.warn("【单日流量同步】该用户没有appkey:{}", email);
lzxry committed
253 254
                continue;
            }
255
            //BigInteger clickNum = accountFlowRestrictService.getTotalNum(yesterday, yesterday, appkeyStr, "account_track_flow_restrict", "click_sum");
kangxiaoshan committed
256
            BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys);
257
            if (clickNum != null && clickNum.longValue() > 0) {
lzxry committed
258
                List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email);
259
                if (contracts.size() == 1) {//只有一个合同
lzxry committed
260
                    TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0));
261
                    if (tkioFlow != null) {
lzxry committed
262
                        tkioFlowList.add(tkioFlow);
lzxry committed
263
                        tkioFlowRepository.save(tkioFlow);
lzxry committed
264
                    }
265
                } else {//多个合同
lzxry committed
266
                    //看昨日被哪几个合同包含了
267
                    List<Contract> correlationContract = new ArrayList<>();
lzxry committed
268
                    for (Contract contract : contracts) {
lzxry committed
269
                        /*if (ContractStatusEnum.CANCEL.getKey().equals(contract.getStatus()) || ContractStatusEnum.SUSPEND.getKey().equals(contract.getStatus())) {
lzxry committed
270 271 272
                            //中止或作废合同处理结束时间,以方便昨日流量的归属计算
                            ContractChange contractChange = contractChangeRepository.findByContentCode(ContractStatusEnum.CANCEL.getValue(), contract.getContractCode());
                            if(contractChange!=null){
lzxry committed
273
                                contract.setEndDate(new DateTime(contractChange.getCreateTime()).toString("yyyy-MM-dd"));
lzxry committed
274
                            }
lzxry committed
275
                        }*/
lzxry committed
276 277
                        String startDate = contract.getStartDate();
                        String endDate = contract.getEndDate();
278
                        if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) {
lzxry committed
279 280 281 282
                            correlationContract.add(contract);
                        }
                    }
                    //多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下
283
                    if (correlationContract.size() > 1) {
lzxry committed
284 285 286 287 288 289 290 291 292
                        //冒泡
                        //第一优先级:合同开始日期,第二优先级,合同编号大小
                        Contract[] contractsArray = new Contract[correlationContract.size()];
                        contractsArray = correlationContract.toArray(contractsArray);

                        contractsArray = orderByContract(contractsArray);

                        for (int i = 0; i < contractsArray.length; i++) {
                            TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]);
293
                            if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {//处理成本流量,如果超出了,依次算在下一个合同上
lzxry committed
294 295
                                clickNum = BigInteger.valueOf(tkioFlow.getCostFlow());
                                tkioFlow.setCostFlow(null);
296
                                if (tkioFlow.getFlow() > 0) {
lzxry committed
297 298 299
                                    tkioFlowList.add(tkioFlow);
                                    tkioFlowRepository.save(tkioFlow);
                                }
300 301
                            } else {
                                if (tkioFlow != null) {
lzxry committed
302
                                    tkioFlowList.add(tkioFlow);
lzxry committed
303
                                    tkioFlowRepository.save(tkioFlow);
lzxry committed
304 305 306 307 308
                                }
                                break;
                            }

                        }
309 310
                    } else {
                        if (correlationContract.size() == 0) {//昨日不包含在所有合同中
lzxry committed
311 312 313 314 315 316
                            //排序
                            Contract[] contractsArray = new Contract[contracts.size()];
                            contractsArray = contracts.toArray(contractsArray);
                            contractsArray = orderByContract(contractsArray);
                            //如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上
                            for (int i = 0; i < contractsArray.length; i++) {
317
                                if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) {
lzxry committed
318 319
                                    int j = 0;

320 321
                                    if (i != 0) {
                                        j = i - 1;
lzxry committed
322 323
                                    }

324
                                    if (i == contractsArray.length - 1) {
lzxry committed
325 326 327 328
                                        j = i;
                                    }

                                    TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]);
329
                                    if (tkioFlow != null) {
lzxry committed
330
                                        tkioFlowList.add(tkioFlow);
lzxry committed
331
                                        tkioFlowRepository.save(tkioFlow);
lzxry committed
332 333 334 335 336 337
                                    }
                                    break;
                                }
                            }


338
                        } else {
lzxry committed
339
                            TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0));
340
                            if (tkioFlow != null) {
lzxry committed
341
                                tkioFlowList.add(tkioFlow);
lzxry committed
342
                                tkioFlowRepository.save(tkioFlow);
lzxry committed
343 344 345 346 347 348 349 350 351
                            }
                        }

                    }
                }
            }


        }
352
        if (tkioFlowList.size() > 0) {
lzxry committed
353
            //tkioFlowRepository.save(tkioFlowList);
lzxry committed
354 355 356 357 358
        }

    }

    //排序
359 360 361
    public Contract[] orderByContract(Contract[] contractsArray) {
        for (int i = 0; i < contractsArray.length - 1; i++) {
            for (int j = 0; j < contractsArray.length - 1 - i; j++) {
lzxry committed
362 363
                String startDate1 = contractsArray[j].getSignedDate() == null ? contractsArray[j].getStartDate() : contractsArray[j].getSignedDate();
                String startDate2 = contractsArray[j + 1].getSignedDate() == null ? contractsArray[j + 1].getStartDate() : contractsArray[j + 1].getSignedDate();
364
                if (DateUtil.getDate(startDate1).getTime() > DateUtil.getDate(startDate2).getTime()) {
lzxry committed
365
                    Contract temp = contractsArray[j];
366
                    contractsArray[j] = contractsArray[j + 1];
lzxry committed
367
                    contractsArray[j + 1] = temp;
368
                } else if (DateUtil.getDate(startDate1).getTime() == DateUtil.getDate(startDate2).getTime()) {
369 370

                    String contractCode = StringUtil.matchNumber(contractsArray[j].getContractCode());
371 372
                    String contractCodeMin = StringUtil.matchNumber(contractsArray[j + 1].getContractCode());
                    if (Long.valueOf(contractCode) > Long.valueOf(contractCodeMin)) {
lzxry committed
373
                        Contract temp = contractsArray[j];
374
                        contractsArray[j] = contractsArray[j + 1];
lzxry committed
375 376 377 378 379 380 381 382
                        contractsArray[j + 1] = temp;
                    }
                }
            }
        }
        return contractsArray;
    }

383
    public TkioFlow getTkioFlow(BigInteger clickNum, String yesterday, Contract contract) {
lzxry committed
384
        TkioFlow tkioFlow = new TkioFlow();
385
        tkioFlow.setCreateTime(DateUtil.now());
lzxry committed
386 387 388 389
        tkioFlow.setDs(yesterday);
        tkioFlow.setEmail(contract.getEmail());
        tkioFlow.setContractCode(contract.getContractCode());
        try {
390
            if (DateUtil.daysBetween(contract.getStartDate(), yesterday) < 0 || DateUtil.daysBetween(contract.getEndDate(), yesterday) > 0) {//昨日日期早于合同开始日期
lzxry committed
391 392
                //设置为成本流量
                tkioFlow.setCostFlow(clickNum.longValue());
393
            } else {
lzxry committed
394
                //查看历史总消耗流量是否超出
395 396
                BigDecimal totalFlow = tkioFlowRepository.sumFlowByEmailAndContractCode(contract.getEmail(), contract.getContractCode());
                totalFlow = totalFlow == null ? new BigDecimal(0) : totalFlow;
397
                Double contractTrackFlow = contract.getTrackFlow() * 10000;
398
                if (contractTrackFlow.longValue() - totalFlow.longValue() - clickNum.longValue() >= 0 || contract.getPriceLevel() == Constant.tkioPriceLevelNotLimit) {
399
                    tkioFlow.setFlow(clickNum.longValue());
400
                } else {
401
                    tkioFlow.setFlow(contractTrackFlow.longValue() - totalFlow.longValue());
402
                    tkioFlow.setCostFlow(clickNum.longValue() - tkioFlow.getFlow());
403
                }
lzxry committed
404 405 406
            }
            return tkioFlow;
        } catch (ParseException e) {
407
            logger.error("合同编号-" + contract.getContractCode() + "-同步昨日流量错误:", e);
lzxry committed
408 409 410 411
        }
        return null;
    }

lzxry committed
412

413 414 415
    /**
     * 功能描述:临时跑一段时间内的流量任务
     */
kangxiaoshan committed
416
    public void reset() {
kangxiaoshan committed
417

kangxiaoshan committed
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
        StopWatch all = new StopWatch();
        all.start();
        List<RestTaskInfo> taskInfos = restTaskInfoRepository.findByStatus("init");
        if (taskInfos == null || taskInfos.size() > 1 || taskInfos.isEmpty()) {
            logger.info("重置流量配置错误");
            return;
        }
        RestTaskInfo info = taskInfos.get(0);
        if (!"init".equals(info.getStatus())) {
            logger.info("重置流量配置错误");
            return;
        }
        if (info.getPoolSize() <= 0 || info.getPoolSize() > 100) {
            info.setPoolSize(30);
        }

        ExecutorService executorService = Executors.newFixedThreadPool(info.getPoolSize());

        List<String> emails;
        if (StringUtils.isEmpty(info.getAccountEmails())) {
            emails = contractRepository.findDistinctEmailByPlatform("tkio");
        } else {
            emails = Arrays.asList(info.getAccountEmails().split(","));
        }

        List<String> dateInterval = DateUtil.getDateInterval(info.getStartDs(), info.getEndDs());
lzxry committed
444

kangxiaoshan committed
445
        for (String dsone : dateInterval) {
kangxiaoshan committed
446 447
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
kangxiaoshan committed
448
            String status = restTaskInfoRepository.findStatus(info.getId());
kangxiaoshan committed
449
            if (!status.equals("init")) {
kangxiaoshan committed
450 451 452 453
                logger.info("暂停重置流量 ......");
                break;
            }

kangxiaoshan committed
454
            CompletableFuture[] futures = emails.stream().map(em ->
kangxiaoshan committed
455
                    CompletableFuture.runAsync(() -> {
kangxiaoshan committed
456 457
                        reset(dsone, em);
                    }, executorService).exceptionally((t) -> {
kangxiaoshan committed
458
                        logger.error("重置流量错误 " + em + "-" + dsone, t);
kangxiaoshan committed
459
                        return null;
kangxiaoshan committed
460
                    })).toArray(size -> new CompletableFuture[size]);
kangxiaoshan committed
461
            CompletableFuture.allOf(futures).join();
kangxiaoshan committed
462
            stopWatch.stop();
kangxiaoshan committed
463
            logger.info("重置流量 {} accoount, {}s, ds {} ", emails.size(), stopWatch.getTotalTimeSeconds(), dsone);
kangxiaoshan committed
464
        }
kangxiaoshan committed
465 466

        logger.info("重置流量完成 ! ...");
kangxiaoshan committed
467
        info.setStatus("complate");
kangxiaoshan committed
468 469 470
        all.stop();
        info.setAllSeconds(all.getTotalTimeSeconds());
        restTaskInfoRepository.save(info);
kangxiaoshan committed
471
        executorService.shutdown();
kangxiaoshan committed
472 473 474 475
    }

    public void reset(String yesterday, String email) {

kangxiaoshan committed
476
        //删除历史数据
kangxiaoshan committed
477 478 479 480
        List<Long> delIdList = tkioFlowRepository.findIdByEmailDs(email, yesterday);
        if (delIdList != null && !delIdList.isEmpty()) {
            tkioFlowRepository.deleteByIds(delIdList);
        }
kangxiaoshan committed
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501

        //查询用户下所有appkey
        Account account = accountRepository.findByEmail(email);
        if (account == null || account.getRootParent() == null) {
            return;
        }
        List<Account> accountList = accountRepository.findByRootParent(account.getRootParent());
        List<Long> idList = new ArrayList<>();
        for (Account ac : accountList) {
            idList.add(ac.getId());
        }
        List<String> appkeys = appRepository.findAppkeysNotDebug(idList);
        if (CollectionUtils.isEmpty(appkeys)) {
            return;
        }
        BigInteger clickNum = flowService.getFlowByAccount(yesterday, yesterday, appkeys);
        if (clickNum != null && clickNum.longValue() > 0) {
            List<Contract> contracts = contractRepository.findByPlatformAndEmail("tkio", email);
            if (contracts.size() == 1) {//只有一个合同
                TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contracts.get(0));
                if (tkioFlow != null) {
kangxiaoshan committed
502
                    saveTkioFlowItem(tkioFlow);
lzxry committed
503
                }
kangxiaoshan committed
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
            } else {//多个合同
                //看昨日被哪几个合同包含了
                List<Contract> correlationContract = new ArrayList<>();
                for (Contract contract : contracts) {
                    String startDate = contract.getStartDate();
                    String endDate = contract.getEndDate();
                    if (DateUtil.getDate(yesterday).getTime() >= DateUtil.getDate(startDate).getTime() && DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(endDate).getTime()) {
                        correlationContract.add(contract);
                    }
                }
                //多个合同时,进行排序,如果第一个合同流量超出就要看第二个合同,以此往下
                if (correlationContract.size() > 1) {
                    //冒泡
                    //第一优先级:合同开始日期,第二优先级,合同编号大小
                    Contract[] contractsArray = new Contract[correlationContract.size()];
                    contractsArray = correlationContract.toArray(contractsArray);
                    contractsArray = orderByContract(contractsArray);
                    for (int i = 0; i < contractsArray.length; i++) {
                        TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[i]);
                        if (tkioFlow != null && i < contractsArray.length - 1 && tkioFlow.getCostFlow() != null && tkioFlow.getCostFlow() > 0L) {//处理成本流量,如果超出了,依次算在下一个合同上
                            clickNum = BigInteger.valueOf(tkioFlow.getCostFlow());
                            tkioFlow.setCostFlow(null);
                            if (tkioFlow.getFlow() > 0) {
kangxiaoshan committed
527
                                saveTkioFlowItem(tkioFlow);
lzxry committed
528
                            }
kangxiaoshan committed
529 530
                        } else {
                            if (tkioFlow != null) {
kangxiaoshan committed
531
                                saveTkioFlowItem(tkioFlow);
kangxiaoshan committed
532 533
                            }
                            break;
lzxry committed
534 535
                        }

kangxiaoshan committed
536 537 538 539 540 541 542 543 544 545 546
                    }
                } else {
                    if (correlationContract.size() == 0) {//昨日不包含在所有合同中
                        //排序
                        Contract[] contractsArray = new Contract[contracts.size()];
                        contractsArray = contracts.toArray(contractsArray);
                        contractsArray = orderByContract(contractsArray);
                        //如果昨日日期在第一个合同之前,则归属到第一个合同的成本,其余区间都归属到前一个合同的成本上
                        for (int i = 0; i < contractsArray.length; i++) {
                            if (DateUtil.getDate(yesterday).getTime() <= DateUtil.getDate(contractsArray[i].getStartDate()).getTime() || i == contractsArray.length - 1) {
                                int j = 0;
lzxry committed
547

kangxiaoshan committed
548 549
                                if (i != 0) {
                                    j = i - 1;
lzxry committed
550 551
                                }

kangxiaoshan committed
552 553
                                if (i == contractsArray.length - 1) {
                                    j = i;
lzxry committed
554 555
                                }

kangxiaoshan committed
556
                                TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, contractsArray[j]);
lzxry committed
557
                                if (tkioFlow != null) {
kangxiaoshan committed
558
                                    saveTkioFlowItem(tkioFlow);
lzxry committed
559
                                }
kangxiaoshan committed
560
                                break;
lzxry committed
561
                            }
kangxiaoshan committed
562 563 564 565 566
                        }

                    } else {
                        TkioFlow tkioFlow = getTkioFlow(clickNum, yesterday, correlationContract.get(0));
                        if (tkioFlow != null) {
kangxiaoshan committed
567
                            saveTkioFlowItem(tkioFlow);
lzxry committed
568 569 570
                        }
                    }

kangxiaoshan committed
571
                }
lzxry committed
572 573 574 575
            }
        }

    }
kangxiaoshan committed
576

kangxiaoshan committed
577 578 579 580
    private void saveTkioFlowItem(TkioFlow tkioFlow) {
        tkioFlowRepository.save(tkioFlow);
    }

lzxry committed
581
}