package mobvista.dmp.datasource.personagraph; import com.google.gson.JsonObject; import mobvista.dmp.util.DelayUtil; import mobvista.prd.datasource.util.GsonUtil; import mobvista.prd.datasource.util.HttpUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.util.ConfigUtil; import org.apache.log4j.Logger; import org.json.JSONObject; import java.io.*; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; /** * */ public class MessageSender implements Runnable { private static final Logger logger = Logger.getLogger(MessageSender.class); private static final String reqURL = "https://api.personagraph.com/pgedge/ingestion/events/1.0"; private static final String requestBody = "{\"source\":{\"name\":\"com.mobvista.dataplatform\",\"type\":\"UNKNOWN\"},\"identifiers\":{\"primary\":{\"id\":\"%s\",\"type\":\"DEVICE_ID\",\"pii\":\"FALSE\"}}}"; private String token; private String filePath; private Boolean stop = false; private Boolean pause = false; private BufferedWriter writer; private CountDownLatch downLatch; private BlockingQueue blockingQueue; public MessageSender(String token, String filePath, BlockingQueue blockingQueue, CountDownLatch downLatch) throws Exception { this.token = token; this.filePath = filePath; this.downLatch = downLatch; this.blockingQueue = blockingQueue; buildWriter(); } @Override public void run() { DelayUtil du = new DelayUtil(); Map header = new HashMap<String, String>(); header.put("Content-Type", "application/json; charset=utf-8;"); header.put("Authorization", " Bearer " + token); try { while (true) { if (pause || stop) { if (pause) { Thread.sleep(2000); } else { break; } } else { String deviceId = String.valueOf(blockingQueue.take()); du.setStart(System.currentTimeMillis()); JsonObject reqBoday = GsonUtil.String2JsonObject(String.format(requestBody, deviceId)); String response = HttpUtil.postRequestJson(reqURL, reqBoday, header, logger); writer.write(deviceId); writer.newLine(); du.setEnd(System.currentTimeMillis()); du.printEnd("Finish upload " + deviceId); } } } catch (Exception e) { e.printStackTrace(); } finally { if (writer != null) { try { writer.flush(); writer.close(); } catch (Exception e) { e.printStackTrace(); } } if (downLatch != null) { downLatch.countDown(); } } } public void buildWriter() throws Exception { Path path = new Path(filePath); FileSystem fs = FileSystem.get(URI.create(filePath), new Configuration()); OutputStream out = fs.create(path); Writer writer = new OutputStreamWriter(out); BufferedWriter fileWriter = new BufferedWriter(writer); this.writer = fileWriter; } }