package mobvista.dmp.datasource.bundle.service; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; import com.alibaba.fastjson.JSONObject; import com.google.common.util.concurrent.*; import mobvista.dmp.common.Constants; import mobvista.dmp.util.DateUtil; import mobvista.dmp.util.MRUtils; import org.apache.commons.lang.StringUtils; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import java.io.*; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; /** * @package: mobvista.dmp.datasource.bundle.service * @author: wangjf * @date: 2020-11-02 * @time: 11:22 * @email: jinfeng.wang@mobvista.com */ public class BundleMatchServer { static Logger logger; static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 20, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(20), new CustomizableThreadFactory("BundleMatchMain"), new ThreadPoolExecutor.CallerRunsPolicy()); private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd"); public static void main(String[] args) throws JoranException, IOException { String input = ""; String output = ""; if (args.length >= 2) { input = args[0]; output = args[1]; } BufferedWriter out = null; try { out = new BufferedWriter(new FileWriter(output)); } catch (IOException e) { logger.info("IOException -->> " + e.getMessage()); } LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(context); context.reset(); configurator.doConfigure(BundleMatchServer.class.getClassLoader().getResourceAsStream("logback-syslog.xml")); logger = context.getLogger("BundleMatchMain"); long start = System.currentTimeMillis(); Set<String> pkgs = readFile(input); logger.info("Correct PKS size -->> " + pkgs.size()); ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor); MoreExecutors.addDelayedShutdownHook(listeningExecutor, 2, TimeUnit.SECONDS); List<ListenableFuture<BundleClass>> futures = new CopyOnWriteArrayList<>(); List<BundleClass> resultList = new CopyOnWriteArrayList<>(); for (String pkg : pkgs) { ListenableFuture listenableFuture = listeningExecutor.submit(() -> { String packageName = request(pkg); BundleClass bundleClass = new BundleClass(); bundleClass.setBundleId(pkg); bundleClass.setPackageName(packageName); return bundleClass; }); Futures.addCallback(listenableFuture, new FutureCallback<BundleClass>() { @Override public void onSuccess(BundleClass bundleClass) { if (StringUtils.isNotBlank(bundleClass.getPackageName())) { logger.info("PKG -->> " + bundleClass.getBundleId() + ", ID -->> " + bundleClass.getPackageName()); resultList.add(bundleClass); } } @Override public void onFailure(Throwable t) { // NO DO! } }); futures.add(listenableFuture); } for (ListenableFuture<BundleClass> future : futures) { try { BundleClass bundleClass = future.get(); if (StringUtils.isNotBlank(bundleClass.getPackageName())) { assert out != null; out.write(MRUtils.JOINER.join(bundleClass.getBundleId(), bundleClass.getPackageName())); out.newLine(); } } catch (InterruptedException | ExecutionException | IOException e) { logger.info(e.getMessage()); } } assert out != null; out.flush(); out.close(); long end = System.currentTimeMillis(); logger.info("Runtime -->> " + (end - start)); poolExecutor.shutdown(); if (poolExecutor.isShutdown()) { System.exit(0); } } public static Set<String> readFile(String filePath) { Set<String> pkgSet = new HashSet<>(); try { File file = new File(filePath); if (file.isFile() && file.exists()) { // 考虑到编码格式 InputStreamReader read = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8); BufferedReader bufferedReader = new BufferedReader(read); String lineTxt; while ((lineTxt = bufferedReader.readLine()) != null) { if (Constants.adrPkgPtn.matcher(lineTxt).matches()) { pkgSet.add(lineTxt); } } bufferedReader.close(); read.close(); } else { logger.info("NOT FOUND !"); } } catch (Exception e) { logger.info("Read File Error !Exception -->> " + e.getMessage()); } return pkgSet; } public static String request(String packageName) { CloseableHttpClient client = HttpClients.createDefault(); List<BasicNameValuePair> formparams = new ArrayList<>(); final String serverUrl = "http://itunes.apple.com/lookup"; URIBuilder uri = new URIBuilder(); try { uri = new URIBuilder(serverUrl).addParameter("bundleId", packageName); } catch (URISyntaxException e) { e.printStackTrace(); } RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(10000).setConnectionRequestTimeout(10000) .setSocketTimeout(10000).build(); HttpGet httpGet = new HttpGet(); String trackId = ""; CloseableHttpResponse response; try { httpGet = new HttpGet(uri.build()); httpGet.setConfig(requestConfig); response = client.execute(httpGet); BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity().getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } JSONObject jsonObject = Constants.String2JSONObject(result.toString()); if (jsonObject.containsKey("resultCount") && jsonObject.containsKey("results") && jsonObject.getInteger("resultCount") > 0 && jsonObject.getJSONArray("results") != null && jsonObject.getJSONArray("results").size() > 0) { JSONObject json = jsonObject.getJSONArray("results").getJSONObject(0); if (json.containsKey("trackId")) { trackId = json.getString("trackId"); } } } catch (URISyntaxException | IOException e) { // logger.info("URISyntaxException | IOException -->> " + e.getMessage()); } finally { httpGet.abort(); } return trackId; } }