UploadDeviceId.java 5.26 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
package mobvista.dmp.datasource.personagraph;

import mobvista.dmp.thread.TaskPool;
import mobvista.prd.datasource.util.GsonUtil;
import mobvista.prd.datasource.util.HttpUtil;
import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import java.io.*;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;

public class UploadDeviceId implements Serializable {
    private BlockingQueue blockingQueue;
    public DataFileReader fileReader;
    public List<MessageSender> senderList;


    public int run(String[] args) throws Exception {
        BufferedReader reader = null;
        try {
            Options options = buildOptions();
            BasicParser parser = new BasicParser();
            CommandLine commandLine = parser.parse(options, args);
            if (!checkMustOption(commandLine, options)) {
                printUsage(options);
                return -1;
            } else {
                printOptions(commandLine, options);
            }

            String didDataPath = commandLine.getOptionValue("didDataPath");
            String finishPath = commandLine.getOptionValue("finishPath");
            String senderNum = commandLine.getOptionValue("senderNum");
            String queueSize = commandLine.getOptionValue("queueSize");

            senderList = new ArrayList<>();
            blockingQueue = new LinkedBlockingDeque(Integer.parseInt(queueSize));

            // 初始化fileReader线程
            reader = buildFileReader(didDataPath);
            fileReader = new DataFileReader(blockingQueue, reader);
            TaskPool.getThreadPool().execute(fileReader);

            // 初始化messageSender线程
            String token = getToken();
            Integer senderNumber = Integer.parseInt(senderNum);
            CountDownLatch downLatch = new CountDownLatch(senderNumber);
            for (int i = 0; i < senderNumber; i++) {
                MessageSender sender = new MessageSender(token, finishPath + "_" + i, blockingQueue, downLatch);
                TaskPool.getThreadPool().execute(sender);
                senderList.add(sender);
            }
            downLatch.wait();
        } finally {
            if (reader != null) {
                reader.close();
            }
            TaskPool.shutdownAndAwaitTermination(TaskPool.getThreadPool());
        }
        return 0;
    }


    public String getToken() throws Exception {
        String url = "https://api.personagraph.com/pgedge/ingestion/accesstoken/1.0";
        Map<String, String> header = new HashMap<String, String>();
        header.put("partnerId", "138");
        header.put("partnerKey", "aiZQLc3hMJZLM19T");
        String tokenStr = HttpUtil.getRequest(url,null, header, Logger.getLogger(UploadDeviceId.class));
        if (tokenStr.contains("accessToken")) {
            return GsonUtil.String2JsonObject(tokenStr).get("accessToken").getAsString();
        }
        return null;
    }


    public BufferedReader buildFileReader(String path) throws Exception {
        InputStream in = new FileInputStream(path);
        Reader reader = new InputStreamReader(in);
        return new BufferedReader(reader);
    }


    public Options buildOptions() {
        Options options = new Options();
        options.addOption("didDataPath", true, "[must] deviceId data path");
        options.addOption("finishPath", true, "[must] upload successly did output path");
        options.addOption("senderNum", true, "[must] message sender thread number");
        options.addOption("queueSize", true, "[must] queue size");
        return options;
    }


    public boolean checkMustOption(CommandLine commands, Options options){
        Collection<Option> collect = options.getOptions();
        for (Option opt : collect) {
            String desc = opt.getDescription();
            if (desc.contains("[must]")) {
                String optValue = commands.getOptionValue(opt.getOpt());
                if (StringUtils.isEmpty(optValue)) {
                    System.out.println("Please set paramter " + opt.getArgName());
                    return false;
                }
            }
        }
        return true;
    }


    public void printOptions (CommandLine commands, Options options) {
        Collection<Option> collect = options.getOptions();
        System.out.println("****************************************");
        for (Option opt : collect) {
            String desc = opt.getDescription();
            if (desc.contains("[must]")) {
                String optValue = commands.getOptionValue(opt.getOpt());
                System.out.println("* " + opt.getOpt() + " = " + optValue);
            }
        }
        System.out.println("****************************************");
    }

    public void printUsage(Options options){
        HelpFormatter help = new HelpFormatter();
        help.printHelp(this.getClass().getSimpleName(), options);
    }

    public static void main(String[] args) throws Exception {
        int exitCode = 0;
        try {
            exitCode = new UploadDeviceId().run(args);
        } catch (Exception e) {
            exitCode = -1;
            e.printStackTrace();
        } finally {
            System.exit(exitCode);
        }
    }
}