Commit 872d6594 by WangJinfeng

init RTDmpRequestDaily

parent 783d0238
package mobvista.dmp.datasource.rtdmp;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import mobvista.dmp.common.Constants;
import mobvista.dmp.util.MD5Util;
import mobvista.dmp.util.PropertyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* @package: mobvista.dmp.datasource.rtdmp
* @author: wangjf
* @date: 2020/7/20
* @time: 4:44 下午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
public class RTDmpServer {
public static final Logger logger = LoggerFactory.getLogger(RTDmpServer.class);
private final static String BASE_URL = PropertyUtil.getProperty("config.properties", "rtdmp.url");
// private final static String BASE_URL = "http://ip-172-31-29-117:8688/";
// private final static String BASE_URL = "http://107.21.162.31:8688/";
public static JSONObject query(String updateTimeStart, String updateTimeEnd, Integer minAudienceDataStatus,
Integer maxAudienceDataStatus, Long audienceDataUtimeStart, Long audienceDataUtimeEnd, String audience_type) throws URISyntaxException {
logger.info("rtdmp/query -->> update_time_start -->>" + updateTimeStart + ",update_time_end -->> " + updateTimeEnd +
",audience_date_utime_start -->> " + audienceDataUtimeStart + ",audience_date_utime_end -->> " + audienceDataUtimeEnd);
JSONArray jsonArray = commonRequest(1, 10000, updateTimeStart, updateTimeEnd, audience_type);
JSONObject result = new JSONObject();
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.getInteger("status") == 1
&& jsonObject.getLong("audience_data_utime") >= audienceDataUtimeStart
&& jsonObject.getLong("audience_data_utime") <= audienceDataUtimeEnd) {
JSONObject audience = new JSONObject();
int audienceId = jsonObject.getInteger("id");
int data_update_method = jsonObject.getInteger("data_update_method");
int audienceType = jsonObject.getInteger("audience_type");
int isSyncDmpServer = jsonObject.getInteger("is_sync_dmpserver");
JSONObject audienceRules = jsonObject.getJSONObject("audience_rules");
String installedPackageName = jsonObject.getString("installed_package_name");
JSONArray groupRules = jsonObject.getJSONArray("group_rules");
if ((audienceRules.get("subtraction") != null && !audienceRules.getJSONArray("subtraction").isEmpty())
|| (audienceRules.get("union") != null && !audienceRules.getJSONArray("union").isEmpty())
|| (audienceRules.get("intersections") != null && !audienceRules.getJSONArray("intersections").isEmpty())
|| (groupRules != null && !groupRules.isEmpty() && !groupRules.getJSONObject(0).isEmpty())
|| StringUtils.isNotBlank(installedPackageName)) {
audience.put("data_update_method", data_update_method);
audience.put("audience_type", audienceType);
audience.put("is_sync_dmpserver", isSyncDmpServer);
audience.put("audience_rules", audienceRules);
audience.put("group_rules", groupRules);
audience.put("installed_package_name", installedPackageName);
result.put(String.valueOf(audienceId), audience);
}
}
}
return result;
}
public static JSONArray commonRequest(int page, int size, String updateTimeStart, String updateTimeEnd, String audience_type) throws URISyntaxException {
CloseableHttpClient client = HttpClients.createDefault();
List<BasicNameValuePair> formparams = new ArrayList<>();
final String serverUrl = BASE_URL + "rtdmp/audience/query";
URIBuilder uri = new URIBuilder();
try {
uri = new URIBuilder(serverUrl)
.addParameter("page", String.valueOf(page))
.addParameter("size", String.valueOf(size))
.addParameter("update_time_start", updateTimeStart)
.addParameter("update_time_end", updateTimeEnd)
.addParameter("audience_type", audience_type);
} catch (URISyntaxException e) {
e.printStackTrace();
}
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(1000).setConnectionRequestTimeout(1000)
.setSocketTimeout(1000).build();
final HttpGet httpGet = new HttpGet(uri.build());
String key = UUID.randomUUID().toString();
String token = MD5Util.getMD5Str(key.toString());
httpGet.setHeader("Auth-System", "dmp");
httpGet.setHeader("Content-Type", "text/plain");
httpGet.setHeader("key", key);
httpGet.setHeader("token", token);
JSONArray jsonArray = new JSONArray();
CloseableHttpResponse response;
try {
response = client.execute(httpGet);
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
StringBuilder result = new StringBuilder();
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
JSONObject jsonObject = Constants.String2JSONObject(result.toString());
if (jsonObject.getInteger("code") == 200 && jsonObject.containsKey("data")) {
jsonArray = jsonObject.getJSONArray("data");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
httpGet.abort();
}
return jsonArray;
}
public static void main(String[] args) {
try {
JSONObject json = query("2021-11-22 00:00:00", "2021-11-23 14:00:00",
0, 2, 1637510400L, 1637647200L, "6");
System.out.println(json);
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
}
package mobvista.dmp.datasource.rtdmp
import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.storage.StorageLevel
import java.net.URI
import scala.collection.JavaConverters.asScalaSetConverter
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* @package: mobvista.dmp.datasource.rtdmp
* @author: wangjf
* @date: 2020/7/13
* @time: 11:25 上午
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class RTDmpRequestDaily extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("date", true, "date")
options.addOption("output", true, "output")
options.addOption("coalesce", true, "coalesce")
options.addOption("hh", true, "hh")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"RTDmpRequestDaily.$date")
val sc = spark.sparkContext
try {
val conf = spark.sparkContext.hadoopConfiguration
conf.set("mapreduce.output.compress", "true")
conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])
spark.udf.register("changeDeviceId", Logic.changeDeviceId _)
spark.udf.register("changeDeviceType", Logic.changeDeviceType _)
val sql =
s"""
|SELECT changeDeviceId(device_type,device_id) device_id, changeDeviceType(device_type) device_type, install_list
| FROM dwh.dmp_install_list WHERE dt = '${date}' AND business = '14days'
|""".stripMargin
// 默认计算上个小时的数据
val update_time_start = DateUtil.format(date + " 00:00:00", "yyyy-MM-dd HH:mm:ss")
val update_time_end = DateUtil.format(date + " 23:59:59", "yyyy-MM-dd HH:mm:ss")
val audience_date_utime_start = DateUtil.parse(date + " 00:00:00", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
val audience_date_utime_end = DateUtil.parse(date + " 23:59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
val json: JSONObject =
RTDmpServer.query(update_time_start, update_time_end, 0,
2, audience_date_utime_start, audience_date_utime_end, "6")
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
val map = new mutable.HashMap[Integer, mutable.HashSet[String]]()
for (id: String <- json.keySet().toArray()) {
val pkgSet = json.getJSONObject(id).getString("installed_package_name").split(",", -1)
map.put(Integer.parseInt(id), pkgSet)
}
val rdd = spark.sql(sql).rdd.map(row => {
val deviceId = row.getAs[String]("device_id")
val deviceType = row.getAs[String]("device_type")
val install_list = MobvistaConstant.String2JSONObject(row.getAs[String]("install_list")).keySet()
val array = new ArrayBuffer[(String, String, Integer)]()
for (key <- map.keySet) {
if (map(key).intersect(install_list.asScala).size > 0) {
array += (deviceId, deviceType, key)
}
}
array
}).flatMap(l => l).persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd.repartition(coalesce).map(t => {
(new Text(s"$output/${t._3}/${t._2}"), new Text(t._1))
}).saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
val mRdd = rdd.map(r => {
(r._3, r._2)
}).distinct(10)
.cache()
val pkgTypeCount = mRdd.countByKey()
val portalMap = mRdd.collectAsMap()
portalMap.foreach(m => {
val updateJsonArray = new JSONArray()
val jsonObject = new JSONObject()
jsonObject.put("id", m._1)
if (pkgTypeCount(m._1) == 1) {
jsonObject.put("s3_path", s"$output/${m._1}/${m._2}/")
} else {
jsonObject.put("s3_path", s"$output/${m._1}/*/")
}
updateJsonArray.add(jsonObject)
ServerUtil.update(updateJsonArray)
})
} finally {
if (sc != null) {
sc.stop()
}
if (spark != null) {
spark.stop()
}
}
0
}
}
object RTDmpRequestDaily {
def main(args: Array[String]): Unit = {
new RTDmpRequestDaily().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment