MessageSender.java 3.44 KB
package mobvista.dmp.datasource.personagraph;

import com.google.gson.JsonObject;
import mobvista.dmp.util.DelayUtil;
import mobvista.prd.datasource.util.GsonUtil;
import mobvista.prd.datasource.util.HttpUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.log4j.Logger;
import org.json.JSONObject;

import java.io.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

/**
 *
 */
public class MessageSender implements Runnable {
    private static final Logger logger = Logger.getLogger(MessageSender.class);
    private static final String reqURL = "https://api.personagraph.com/pgedge/ingestion/events/1.0";
    private static final String requestBody = "{\"source\":{\"name\":\"com.mobvista.dataplatform\",\"type\":\"UNKNOWN\"},\"identifiers\":{\"primary\":{\"id\":\"%s\",\"type\":\"DEVICE_ID\",\"pii\":\"FALSE\"}}}";

    private String token;
    private String filePath;
    private Boolean stop = false;
    private Boolean pause = false;
    private BufferedWriter writer;
    private CountDownLatch downLatch;
    private BlockingQueue blockingQueue;

    public MessageSender(String token, String filePath, BlockingQueue blockingQueue, CountDownLatch downLatch) throws Exception {
        this.token = token;
        this.filePath = filePath;
        this.downLatch = downLatch;
        this.blockingQueue = blockingQueue;
        buildWriter();
    }

    @Override
    public void run() {
        DelayUtil du = new DelayUtil();
        Map header = new HashMap<String, String>();
        header.put("Content-Type", "application/json; charset=utf-8;");
        header.put("Authorization", " Bearer " + token);

        try {
            while (true) {
                if (pause || stop) {
                    if (pause) {
                        Thread.sleep(2000);
                    } else {
                        break;
                    }
                } else {
                    String deviceId = String.valueOf(blockingQueue.take());
                    du.setStart(System.currentTimeMillis());
                    JsonObject reqBoday = GsonUtil.String2JsonObject(String.format(requestBody, deviceId));
                    String response = HttpUtil.postRequestJson(reqURL, reqBoday, header, logger);
                    writer.write(deviceId);
                    writer.newLine();
                    du.setEnd(System.currentTimeMillis());
                    du.printEnd("Finish upload " + deviceId);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (writer != null) {
                try {
                    writer.flush();
                    writer.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if (downLatch != null) {
                downLatch.countDown();
            }
        }
    }

    public void buildWriter() throws Exception {
        Path path = new Path(filePath);
        FileSystem fs = FileSystem.get(URI.create(filePath), new Configuration());
        OutputStream out = fs.create(path);
        Writer writer = new OutputStreamWriter(out);
        BufferedWriter fileWriter = new BufferedWriter(writer);
        this.writer = fileWriter;
    }
}