package com.reyun.service.impl; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.reyun.dic.ConfigEnumType; import com.reyun.dic.KeywordsDataType; import com.reyun.model.ChannelAccount; import com.reyun.model.ConfigParam; import com.reyun.model.KeywordsCreative; import com.reyun.repository.*; import com.reyun.service.AuthService; import com.reyun.service.ConfigParamService; import com.reyun.service.SmSearchService; import com.reyun.util.*; import org.apache.commons.httpclient.Header; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.methods.StringRequestEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.io.*; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.*; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; /** * description: * * @author nolan * @date 12/09/2017 */ @Service public class SmSearchServiceImpl implements SmSearchService { private static final Logger logger = LoggerFactory.getLogger(SmSearchServiceImpl.class); //获取token private static final String SM_GET_TOKEN_URL = "https://e.sm.cn/auth/OAuth2/accessTokenRequest"; //请求生成报表 private static final String SM_REQUEST_REPORT_URL = "https://e.sm.cn/api/bulkJob/getAllObjects"; //查看报表生成状态 private static final String SM_CHECK_REPORT_STATUS_RUL = "https://e.sm.cn/api/task/getTaskState"; //下载报表 private static final String SM_DOWNLOAD_REPORT_URL = "https://e.sm.cn/api/file/download"; //下载文件中创意KEY列名 private static final String SM_CREATIVE_KEY_COLUMN = "creativeId"; //下载文件中创意title列名 private static final String SM_CREATIVE_VALUE_COLUMN = "title"; //下载文件中关键字key列名 private static final String SM_KEYWORDS_KEY_COLUMN = "keywordId"; //下载文件中关键字name列名 private static final String SM_KEYWORDS_VALUE_COLUMN = "keyword"; //关键字文件前缀 private static final String SM_KEYWORDS_FILE_PREFIX = "Keywords_"; //创意文件前缀 private static final String SM_CREATIVE_FILE_PREFIX = "Creatives_"; //请求账户密码错误代码 private static final int REQUEST_PWD_ERROR_CODE = 8102; //下载后文件前缀 private static final String DOWNLOAD_FILE_NAME_PREFIX = "sm_download_"; //下载后文件后缀 private static final String DOWNLOAD_FILE_NAME_SUFFIX = ".zip"; @Autowired private ConfigParamService configParamService; @Autowired private ChannelAccountRepository channelAccountRepository; @Autowired private KeywordsCreativeRepository keywordsCreativeRepository; @Override public void task() { //1. 查询所有神马搜索用户 List<ChannelAccount> channelAccounts = channelAccountRepository.findValidEnableByUniqueName("smsearch"); channelAccounts.forEach(channelAccount -> { //2. 根据神马帐号请求生成文件 long taskId = requestForGenerateReport(channelAccount); if (taskId == -1l) { //retry auth channelAccountRepository.invalidAccount(channelAccount.getId()); return; } //3. 判断文件是否生成并下载 int n = 0; long fileId = 0l; do { if (n != 0) { logger.warn("失败重试次数: {}", n); try { Thread.sleep(10000); } catch (InterruptedException e) { } } } while (n++ < 10 && (fileId = checkReportGenerateStatus(taskId, channelAccount)) == 0); //4. 解析文件 String fileName = downloadReportFromSm(fileId, channelAccount); if (!Strings.isNullOrEmpty(fileName)) { parseKeywordAndCreativeFile(fileName, channelAccount); } }); } @Override public void patch(String fileName, String userName, Long account) { logger.info("start to patch shenma keywords and creative name data from file " + fileName+ "...."); ChannelAccount channelAccount = channelAccountRepository.findByNameAndAccount(userName, account); if (!Strings.isNullOrEmpty(fileName)) { parseKeywordAndCreativeFile(fileName, channelAccount); logger.info("end, filename : " + fileName); } else { //2. 根据神马帐号请求生成文件 long taskId = requestForGenerateReport(channelAccount); if (taskId == -1l) { //retry auth channelAccountRepository.invalidAccount(channelAccount.getId()); return; } //3. 判断文件是否生成并下载 int n = 0; long fileId = 0l; do { if (n != 0) { logger.warn("失败重试次数: {}", n); try { Thread.sleep(10000); } catch (InterruptedException e) { } } } while (n++ < 10 && (fileId = checkReportGenerateStatus(taskId, channelAccount)) == 0); //4. 解析文件 fileName = downloadReportFromSm(fileId, channelAccount); if (!Strings.isNullOrEmpty(fileName)) { parseKeywordAndCreativeFile(fileName, channelAccount); } } } /** * 检查文件生成状态 * @param taskId * @param channelAccount * @return */ public long checkReportGenerateStatus(Long taskId, ChannelAccount channelAccount) { HttpClient httpClient = new HttpClient(new MultiThreadedHttpConnectionManager()); PostMethod postMethod = new PostMethod(SM_CHECK_REPORT_STATUS_RUL); long fileId = 0L; try { //构建请求参数 StringRequestEntity requestEntity = new StringRequestEntity(buildPostRequestBody(channelAccount, "taskId", taskId), "application/json", "UTF-8"); postMethod.setRequestEntity(requestEntity); httpClient.executeMethod(postMethod); if (logger.isDebugEnabled()) { logger.error(postMethod.getResponseBodyAsString()); } //请求检查状态 logger.error("请求检查状态:postMethod.getStatusCode():"+postMethod.getStatusCode()); if (postMethod.getStatusCode() == HttpStatus.SC_OK) { fileId = getResponseBodyValue(postMethod.getResponseBodyAsString(), "fileId"); } logger.error("请求检查状态:postMethod.getResponseBodyAsString():"+postMethod.getResponseBodyAsString()); } catch (IOException e) { fileId = 0L; logger.error(e.getMessage()); } finally { postMethod.releaseConnection(); } return fileId; } /** * 下载报表文件 * @param fileId * @param thirdAccount * @return */ public String downloadReportFromSm(Long fileId, ChannelAccount thirdAccount) { String fileName = null; HttpClient httpClient = new HttpClient(new MultiThreadedHttpConnectionManager()); PostMethod postMethod = new PostMethod(SM_DOWNLOAD_REPORT_URL); try { //构建请求 StringRequestEntity requestEntity = new StringRequestEntity(buildPostRequestBody(thirdAccount, "fileId", fileId), "application/json", "UTF-8"); postMethod.setRequestEntity(requestEntity); httpClient.executeMethod(postMethod); //下载 if (postMethod.getStatusCode() == HttpStatus.SC_OK) { Header contentTypeHeader = postMethod.getResponseHeader("Content-Type"); if (contentTypeHeader.getValue().contains("json")) { //请求参数出错返回 if (getFailedCode(postMethod.getResponseBodyAsString()) == REQUEST_PWD_ERROR_CODE) { fileName = "passwordError"; } } else if (contentTypeHeader.getValue().contains("stream")) { //请求成功开始下载 fileName = generateDownloadFileName(thirdAccount.getName()); Header contentLengthHeader = postMethod.getResponseHeader("Content-Length"); long contentLength = Long.parseLong(contentLengthHeader.getValue()); File downloadFile = new File(Constant.SHENMA_DOWNLOAD_DIR + fileName); FileOutputStream fileOutputStream = new FileOutputStream(downloadFile); ReadableByteChannel rbc = Channels.newChannel(postMethod.getResponseBodyAsStream()); fileOutputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); fileOutputStream.flush(); fileOutputStream.close(); rbc.close(); fileName = downloadFile.length() == contentLength ? fileName : null; } } } catch (IOException e) { fileName = null; logger.error(e.getMessage()); } finally { postMethod.releaseConnection(); } return fileName; } /** * 解析报表文件 * @param fileName * @param channelAccount */ public void parseKeywordAndCreativeFile(String fileName, ChannelAccount channelAccount) { File file = new File(Constant.SHENMA_DOWNLOAD_DIR + fileName); //文件不存在返回 if (!file.exists()) { return; } try { List<KeywordsCreative> insertList = Lists.newArrayList(); Map<String, KeywordsCreative> keywordsCreativeMap = new HashMap<>(); try { keywordsCreativeMap = this.getKeywordCreativeMap(channelAccount.getChannel()); } catch (Exception innere) { logger.error("fail to get keywordsCreativeMap, " + innere.getMessage()); } ZipFile zipFile = new ZipFile(file); Enumeration<?> enumeration = zipFile.entries(); while (enumeration.hasMoreElements()) { ZipEntry zipEntry = (ZipEntry) enumeration.nextElement(); boolean isKeywordCreativeFileExists = !zipEntry.isDirectory() && (zipEntry.getName().startsWith(SM_KEYWORDS_FILE_PREFIX) || zipEntry.getName().startsWith(SM_CREATIVE_FILE_PREFIX)) && zipEntry.getName().endsWith(".csv"); if (isKeywordCreativeFileExists) { boolean isKeywords = zipEntry.getName().startsWith(SM_KEYWORDS_FILE_PREFIX); String keyName = isKeywords ? SM_KEYWORDS_KEY_COLUMN : SM_CREATIVE_KEY_COLUMN; String valueName = isKeywords ? SM_KEYWORDS_VALUE_COLUMN : SM_CREATIVE_VALUE_COLUMN; String dataType = isKeywords ? KeywordsDataType.KEYWORDS.getKey() : KeywordsDataType.CREATIVE.getKey(); InputStream inputStream = zipFile.getInputStream(zipEntry); BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")); String line; int keyIndex = 0; int valueIndex = 0; for (int i = 0; (line = br.readLine()) != null; i++) { String s=",(?=([^\"]*\"[^\"]*\")*[^\"]*$)"; String[] lineArray = line.split(s); if (lineArray.length <= 5) { continue; } //取到第一行关键字位置 if (i == 0) { List<String> keyList = Lists.newArrayList(lineArray); keyIndex = keyList.indexOf(keyName); valueIndex = keyList.indexOf(valueName); continue; } // System.out.println(line); //已经存在更新,不存在插入 KeywordsCreative keywordsCreative = keywordsCreativeMap.get(lineArray[keyIndex] + dataType); if (null != keywordsCreative && !keywordsCreative.getValue().equals(lineArray[valueIndex])) { keywordsCreative.setValue(lineArray[valueIndex]); insertList.add(keywordsCreative); } else if (null == keywordsCreative) { insertList.add(new KeywordsCreative(channelAccount.getCreateAccount(), channelAccount.getId(), channelAccount.getChannel(), dataType, lineArray[keyIndex], lineArray[valueIndex])); } if (insertList.size() == 1000) { try { keywordsCreativeRepository.batchInsert(insertList); logger.info(fileName + "成功" + insertList.size()); } catch (Exception e) { logger.error("1)批量写入关键字出错," + fileName, e); } insertList.clear(); } } br.close(); inputStream.close(); } } zipFile.close(); //入库 if(insertList.size()>0){ try { keywordsCreativeRepository.save(insertList); logger.info(fileName + "成功" + insertList.size()); } catch (Exception e) { logger.error("2)批量写入关键字出错," + fileName, e); } } } catch (IOException e) { logger.error(fileName + "从文件导入数据库失败:" + e.getMessage()); } } @Override public boolean callbackUpdateTokenByCode(Long channelAccountId, String code) { Map<String, String> paramMap = Maps.newHashMap(); paramMap.put("code", code); paramMap.put("grant_type", "authorization_code"); List<ConfigParam> configParams = configParamService.getConfigParamByKeys(ConfigEnumType.SM_CLIENT_ID.getKey(), ConfigEnumType.SM_CLIENT_SECRET.getKey(), ConfigEnumType.SM_REDIRECT_URI.getKey()); for (ConfigParam configParam : configParams) { paramMap.put(configParam.getKeyParam(), configParam.getValueParam()); } String responseJson = HttpClientUtil.doHttpPostRequest(SM_GET_TOKEN_URL, "trackingio", paramMap); Map<String, Object> responseMap = null; try { responseMap = new ObjectMapper().readValue(responseJson, Map.class); } catch (IOException e) { } if (responseMap == null || responseMap.get("error") != null) { logger.warn("回调出现错误: channelAccountid:{}, code:{}", channelAccountId, code); return false; } String token = responseMap.get("access_token").toString(); String refresh_token = responseMap.get("refresh_token").toString(); long invalidTime = Long.parseLong(String.valueOf(responseMap.get("expires_in"))); if (logger.isDebugEnabled()) { logger.debug("channelAccountId:{}, code:{} -> access_token:{}, refresh_token:{}, expires_in:{}", channelAccountId, code, token, refresh_token, invalidTime); } ChannelAccount channelAccount = channelAccountRepository.findOne(channelAccountId); if (logger.isDebugEnabled()) { logger.debug("channelAccount:{}", channelAccount); } if (channelAccount != null) { channelAccount.setAccessToken(token); channelAccount.setRefreshToken(refresh_token); channelAccount.setRefreshTokenExpire(invalidTime); channelAccount.setLastUpdateTime(new Date()); channelAccount.setTestSuccess(true); channelAccount.setPastStatus(false); channelAccountRepository.save(channelAccount); } return true; } public long requestForGenerateReport(ChannelAccount channelAccount) { long taskId = 0L; HttpClient httpClient = new HttpClient(new MultiThreadedHttpConnectionManager()); PostMethod postMethod = new PostMethod(SM_REQUEST_REPORT_URL); try { //构建请求 StringRequestEntity requestEntity = new StringRequestEntity(buildPostRequestBody(channelAccount, "bulkJobRequestType", "{\"campaignIds\":[],\"singleFile\":0,\"format\":0,\"variableColumns\":[],\"fileController\":0 }"), "application/json", "UTF-8"); postMethod.setRequestEntity(requestEntity); httpClient.executeMethod(postMethod); final String responseBodyAsString = postMethod.getResponseBodyAsString(); if (logger.isDebugEnabled()) { logger.debug(responseBodyAsString); } //得到返回 if (postMethod.getStatusCode() == HttpStatus.SC_OK) { taskId = getResponseBodyValue(responseBodyAsString, "taskId"); } if (taskId < 0) { logger.info("fail to generate report......" + channelAccount.getName() + "," + channelAccount.getPassword()); logger.info(responseBodyAsString); } } catch (IOException e) { taskId = 0L; logger.error(e.getMessage()); } finally { postMethod.releaseConnection(); } return taskId; } /** * 获取神马请求失败code */ private int getFailedCode(String responseJson) { int resultCode = 0; try { Map<String, Object> responseBodyMap = new ObjectMapper().readValue(responseJson, HashMap.class); Map<String, Object> headMap = (HashMap) responseBodyMap.get("header"); Map<String, Object> failureMap = (Map) ((List) headMap.get("failures")).get(0); resultCode = (Integer) failureMap.get("code"); } catch (IOException e) { resultCode = 0; } return resultCode; } /** * 获取神马请求返回body中的key对应的值,返回中有body表示成功,只有header表示请求失败 返回失败code */ private int getResponseBodyValue(String responseJson, String key) { int value = 0; Map<String, Object> bodyMap = null; Map<String, Object> responseBodyMap = null; try { responseBodyMap = new ObjectMapper().readValue(responseJson, Map.class); bodyMap = (HashMap) responseBodyMap.get("body"); } catch (IOException e) { logger.error("解析响应: ", e); } if (!CollectionUtils.isEmpty(bodyMap)) { if ("fileId".equals(key)) { value = bodyMap.get("status").equals("FINISHED") && (Boolean) bodyMap.get("success") ? ((Integer) bodyMap.get(key)) : 0; //查询状态为status = FINISHED,success=true 才算生成完成 } else { value = ((Integer) bodyMap.get(key)); } } else { Map<String, Object> headerMap = (HashMap) responseBodyMap.get("header"); Map<String, Object> failureMap = (Map) ((List) headerMap.get("failures")).get(0); value = REQUEST_PWD_ERROR_CODE == ((Integer) failureMap.get("code")) ? -1 : -2; } return value; } /** * 构建神马请求参数中的header JSON */ private String buildRequestRequestBodyHead(ChannelAccount channelAccount) { String resultJson = ""; if (!StringUtil.isEmpty(channelAccount.getAccessToken()) && !StringUtil.isEmpty(channelAccount.getName()) && !StringUtil.isEmpty(channelAccount.getPassword())) { resultJson = "{\"username\":\"" + channelAccount.getName() + "\",\"password\":\"" + channelAccount.getPassword() + "\",\"token\":\"" + channelAccount.getAccessToken() + "\"}"; } return resultJson; } /** * 构建神马请求参数JSON */ private String buildPostRequestBody(ChannelAccount channelAccount, String bodyKey, Object bodyValue) { return "{\"header\":" + buildRequestRequestBodyHead(channelAccount) + ",\"body\":{\"" + bodyKey + "\":" + bodyValue + "}}"; } /** * 生成神马下载文件名 */ private String generateDownloadFileName(String username) { return DOWNLOAD_FILE_NAME_PREFIX + username + "_" + DateUtil.format(new Date(), "YYYYMMddHHmmss") + DOWNLOAD_FILE_NAME_SUFFIX; } private Map<String, KeywordsCreative> getKeywordCreativeMap(Long channelId) { List<KeywordsCreative> keywordsCreativeList = keywordsCreativeRepository.listKeywordCreativeByChannel(channelId); Map<String, KeywordsCreative> result = new HashMap<>(); for (KeywordsCreative keywordsCreative : keywordsCreativeList) { result.put(keywordsCreative.getKeyId() + keywordsCreative.getDataType(), keywordsCreative); } return result; } }