Commit d26c25ff by WangJinfeng

update dmp install_list storage

parent 59a91aaf
......@@ -27,8 +27,8 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
# --conf spark.shuffle.memoryFraction=0.4 \
......
......@@ -27,8 +27,8 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
# --conf spark.shuffle.memoryFraction=0.4 \
......
......@@ -25,16 +25,16 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
--name "DmpInstallList.${business}.${LOG_TIME}" \
--conf spark.sql.shuffle.partitions=400 \
--conf spark.default.parallelism=400 \
--conf spark.sql.shuffle.partitions=1000 \
--conf spark.default.parallelism=1000 \
--conf spark.kryoserializer.buffer.max=256m \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 4 --num-executors 5 \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 4 --num-executors 20 \
../../${JAR} -date ${LOG_TIME} -business ${business} -output ${OUTPUT} -coalesce 100
if [[ $? -ne 0 ]];then
......
......@@ -25,16 +25,19 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
--name "DmpInstallList.${business}.${LOG_TIME}" \
--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=5000 \
--conf spark.kryoserializer.buffer.max=256m \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 6g --executor-cores 5 --num-executors 100 \
--conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 3 --num-executors 200 \
../../${JAR} -date ${LOG_TIME} -business ${business} -output ${OUTPUT} -coalesce 2000
if [[ $? -ne 0 ]];then
......
......@@ -25,8 +25,8 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
......
......@@ -22,8 +22,8 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
......
......@@ -25,8 +25,8 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
......
......@@ -25,8 +25,8 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
......
......@@ -26,8 +26,8 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
......
......@@ -27,8 +27,8 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
......
......@@ -47,8 +47,8 @@ OUTPUT_PATH="${DMP_INSTALL_LIST}/${date_path}/${BUSINESS}"
mount_partition "dmp_install_list" "dt='$date', business='$BUSINESS'" "$OUTPUT_PATH"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${BUSINESS}"
spark-submit --class mobvista.dmp.common.InstallListDailyV2 \
......
......@@ -711,9 +711,8 @@ matchBundlePackageV2() {
spark-submit --class ${class} \
--conf spark.network.timeout=720s \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=5000 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.sql.shuffle.partitions=10000 \
--conf spark.default.parallelism=10000 \
--conf spark.sql.files.maxPartitionBytes=134217728 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=134217728 \
......
......@@ -26,8 +26,8 @@ OUTPUT="${DMP_INSTALL_LIST}/${year}/${month}/${day}/${business}"
mount_partition "dmp_install_list" "dt='$LOG_TIME', business='$business'" "$OUTPUT"
expire_date=$(date +%Y%m%d -d "-4 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-4 day ${LOG_TIME}")
expire_date=$(date +%Y%m%d -d "-7 day $LOG_TIME")
expire_date_path=$(date +"%Y/%m/%d" -d "-7 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
......@@ -38,7 +38,7 @@ spark-submit --class mobvista.dmp.common.InstallListLogic \
--conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--master yarn --deploy-mode cluster --executor-memory 18g --driver-memory 6g --executor-cores 5 --num-executors 100 \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 3 --num-executors 300 \
../${JAR} -date ${LOG_TIME} -business ${business} -output ${OUTPUT} -coalesce 4000
if [[ $? -ne 0 ]];then
......
package mobvista.dmp.datasource.setting;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.common.CommonMapper;
import mobvista.dmp.common.CommonReducer;
import mobvista.dmp.util.MRUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
......@@ -18,7 +16,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Map;
import java.net.URLDecoder;
import java.util.regex.Pattern;
/**
......@@ -64,7 +62,7 @@ public class AppidPackageDictMR extends CommonMapReduce {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String line = URLDecoder.decode(value.toString().replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
String[] array = line.split(",", 5);
if (array.length < 3) {
CommonMapReduce.setMetrics(context, "DMP", "column_num_error", 1);
......@@ -72,14 +70,14 @@ public class AppidPackageDictMR extends CommonMapReduce {
}
if (array.length == 3 && !"null".equals(array[1]) && appidPtn.matcher(array[0]).matches()
&& platformPtn.matcher(array[2]).matches()) { //原 appid, package_name, platform
outKey.set(array[0]);
outValue.set(joiner.join("A", array[1], array[2]));
outKey.set(array[0].trim());
outValue.set(joiner.join("A", array[1].trim(), array[2].trim()));
context.write(outKey, outValue);
} else {
if (appidPtn.matcher(array[0]).matches() && platformPtn.matcher(array[1]).matches()
&& !"null".equals(array[3])) {
outKey.set(array[0]);
outValue.set(joiner.join("B", array[3], array[1])); //package_name, platform
outKey.set(array[0].trim());
outValue.set(joiner.join("B", array[3].trim(), array[1].trim())); //package_name, platform
context.write(outKey, outValue);
}
}
......
......@@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URLDecoder;
import java.util.Map;
/**
......@@ -91,8 +92,8 @@ public class NginxSettingMR extends Configured implements Tool {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String uri="";
String line = URLDecoder.decode(value.toString().replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
String uri = "";
if(line.startsWith("[") && line.contains("req_info")) {
String[] split = line.split("\t");
if (split.length < 2) {
......@@ -118,7 +119,7 @@ public class NginxSettingMR extends Configured implements Tool {
}
uri = line.substring(start, end);
}
if (StringUtils.isBlank(uri) && !uri.startsWith("/setting?") && !uri.startsWith("/appwall/setting?")) {
if (StringUtils.isBlank(uri) && !uri.startsWith("/setting?") && !uri.startsWith("/appwall/setting?") && !uri.startsWith("/rewardsetting?")) {
return;
}
uri = uri.substring(uri.indexOf("?") + 1);
......
......@@ -2,7 +2,7 @@ package mobvista.dmp.common
import com.alibaba.fastjson.JSON
import mobvista.dmp.common.MobvistaConstant.deviceTypeSet
import mobvista.dmp.util.{DateUtil, MRUtils}
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.Options
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
......@@ -74,7 +74,7 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
}
val ext_data = row.getAs[String]("ext_data")
val install_list = row.getAs("install_list").toString
(MRUtils.JOINER.join(device_id, device_type, platform), (install_list, ext_data, country))
((device_id, device_type, platform), (install_list, ext_data, country))
})
val last_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -1), "yyyyMMdd")
......@@ -99,30 +99,31 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
val ext_data = row.getAs("ext_data").toString
val update_date = row.getAs("update_date").toString
// new table
(MRUtils.JOINER.join(device_id, device_type, platform), MRUtils.JOINER.join(install_list, ext_data, update_date, country))
((device_id, device_type, platform), (install_list, ext_data, update_date, country))
})
import spark.implicits._
// .mapPartitions(v => new CustomInteratorList(dateTime, v))
val df = if (business.endsWith("unmatch")) {
val df = if (business.equalsIgnoreCase("dsp_req_unmatch")) {
dailyRDD.map(tp => {
val keys = MRUtils.SPLITTER.split(tp._1)
val dailyK = tp._1
// val keys = MRUtils.SPLITTER.split(tp._1)
val pkgs = tp._2._1
val ext_data = tp._2._2
val country = tp._2._3
val updateDate = dateTime
var deviceType = keys(1)
var deviceType = dailyK._2
if (deviceType.equalsIgnoreCase("android_id") || deviceType.equalsIgnoreCase("androidid")) {
deviceType = "androidid"
}
var platform = keys(2)
var platform = dailyK._3
if (platform.equalsIgnoreCase("android") || platform.equalsIgnoreCase("android2") || platform.equalsIgnoreCase("adr")) {
platform = "android"
} else if (platform.equalsIgnoreCase("ios") || platform.equalsIgnoreCase("ios2")) {
platform = "ios"
}
DmpInstallList(keys(0), deviceType, platform, country, pkgs, ext_data, updateDate)
DmpInstallList(dailyK._1, deviceType, platform, country, pkgs, ext_data, updateDate)
})
} else {
dailyRDD.fullOuterJoin(installRDD)
......@@ -136,9 +137,10 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
var country = ""
var updateDate = ""
if (dailyOpt.isEmpty && totalOpt.isDefined) {
val installListDate = MRUtils.SPLITTER.split(totalOpt.get, -1)
pkgs = installListDate(0)
val old_ext_data_json = JSON.parseObject(installListDate(1))
val total = totalOpt.get
// val installListDate = MRUtils.SPLITTER.split(totalOpt.get, -1)
pkgs = total._1
val old_ext_data_json = JSON.parseObject(total._2)
val region_list = if (old_ext_data_json.containsKey("region")) {
JSON.parseArray(old_ext_data_json.getString("region"), classOf[String])
} else {
......@@ -146,8 +148,8 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
}
old_ext_data_json.put("region", new util.HashSet(region_list))
ext_data = old_ext_data_json.toJSONString
updateDate = installListDate(2)
country = installListDate(3)
updateDate = total._3
country = total._4
} else if (dailyOpt.isDefined && totalOpt.isEmpty) {
pkgs = dailyOpt.get._1
ext_data = dailyOpt.get._2
......@@ -157,8 +159,9 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
// 优先加入最近活跃的 package_name
val installJson = JSON.parseObject(dailyOpt.get._1).asInstanceOf[java.util.Map[String, String]]
// 删除过期的安装信息,并对安装时间进行排序,优先入库最近活跃的安装包
val array = MRUtils.SPLITTER.split(totalOpt.get, -1)
val installMap = JSON.parseObject(array(0)).asInstanceOf[java.util.Map[String, String]].asScala.retain((k, _) => !installJson.contains(k))
// val array = MRUtils.SPLITTER.split(totalOpt.get, -1)
val total = totalOpt.get
val installMap = JSON.parseObject(total._1).asInstanceOf[java.util.Map[String, String]].asScala.retain((k, _) => !installJson.contains(k))
if (v2_flag) {
// adn 上报业务 新增安装包处理逻辑;该逻辑不会影响其他业务线数据处理logic
installJson.keySet().foreach(k => {
......@@ -195,7 +198,7 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
installJson.putAll(installMap.asJava)
}
pkgs = installJson.toString
val old_ext_data_json = JSON.parseObject(array(1))
val old_ext_data_json = JSON.parseObject(total._2)
val daily_ext_data_json = JSON.parseObject(dailyOpt.get._2)
if (daily_ext_data_json.containsKey("dev_tag") && daily_ext_data_json.getInteger("dev_tag") == 1) {
old_ext_data_json.put("dev_tag", daily_ext_data_json.getInteger("dev_tag"))
......@@ -224,22 +227,21 @@ abstract class CommonInstallListOrc extends CommonSparkJob with Serializable {
country = if (StringUtils.isNotBlank(dailyOpt.get._3)) {
dailyOpt.get._3
} else {
array(3)
total._4
}
updateDate = dateTime
}
val keys = MRUtils.SPLITTER.split(key, -1)
var deviceType = keys(1)
var deviceType = key._2
if (deviceType.equalsIgnoreCase("android_id") || deviceType.equalsIgnoreCase("androidid")) {
deviceType = "androidid"
}
var platform = keys(2)
var platform = key._3
if (platform.equalsIgnoreCase("android") || platform.equalsIgnoreCase("android2") || platform.equalsIgnoreCase("adr")) {
platform = "android"
} else if (platform.equalsIgnoreCase("ios") || platform.equalsIgnoreCase("ios2")) {
platform = "ios"
}
DmpInstallList(keys(0), deviceType, platform, country, pkgs, ext_data, updateDate)
DmpInstallList(key._1, deviceType, platform, country, pkgs, ext_data, updateDate)
})
}
......
......@@ -3,6 +3,7 @@ package mobvista.dmp.common
import org.apache.commons.cli.{BasicParser, CommandLine, HelpFormatter, Option, Options}
import org.apache.commons.lang.StringUtils
import java.util.regex.Pattern
import scala.collection.JavaConversions._
/**
......@@ -40,6 +41,11 @@ abstract class CommonSparkJob {
// Date
val datePtn = """^\d{4}-\d{2}-\d{2}"""
val iOSPkgPtn = Pattern.compile("^\\d+$")
val iOSIDPkgPtn = Pattern.compile("^id\\d+$")
val androidPkgPtn = Pattern.compile("^(?=^.{3,255}$)[a-zA-Z0-9_][-a-zA-Z0-9_]{0,62}(\\.[a-zA-Z0-9_][-a-zA-Z0-9_]{0,62})+$")
val options = buildOptions()
val commParser = new BasicParser
......
......@@ -4,6 +4,7 @@ import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel
......@@ -518,7 +519,11 @@ class OdsDmpUserInfoDailyV2 extends CommonSparkJob with Serializable {
installJSON.keySet().iterator().foreach(k => {
installStr.append(",").append(k).append("|").append(installJSON.getString(k))
})
installStr.substring(1)
if (StringUtils.isNotBlank(installStr)) {
installStr.substring(1)
} else {
""
}
}
def parseAge(row: Row): Tuple3[String, Int, Double] = {
......
......@@ -9,8 +9,6 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI
import scala.collection.JavaConverters._
import scala.collection.mutable
/**
* @package: mobvista.dmp.datasource.dm
......@@ -66,7 +64,6 @@ class DmpDeviceInterest extends CommonSparkJob with Serializable {
.createOrReplaceTempView("t_app_two_tags")
spark.udf.register("str2Json", str2Json _)
spark.udf.register("mergeInstallList", mergeInstallList _)
val new_tag_sql = InterestTagConstant.tag_sql_v2
......@@ -78,29 +75,8 @@ class DmpDeviceInterest extends CommonSparkJob with Serializable {
packageName = packageName.replace("id", "")
}
(packageName, r.getAs("tags").toString)
// (r.getAs("package_name").toString.toLowerCase(), r.getAs[mutable.WrappedArray[String]]("tag_ids").asJava)
}).collectAsMap())
// newMap.value.foreach(println)
// val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -15), "yyyy-MM-dd")
// val before_date = DateUtil.getDayByString(date, "yyyyMMdd", -1)
// val before_update_date = DateUtil.format(DateUtil.getDay(before_date, "yyyyMMdd", -15), "yyyy-MM-dd")
// val udf_mergeInstallList = udf(mergeInstallList _)
// val udf_mergeExtData = udf(Constant.mergeExtData _)
/*
val input = "s3://mob-emr-test/wangjf/data/install_list_orc/2020/04/23/adn_request_sdk"
val df = spark.read.orc(input).groupBy(lower(col("device_id")).alias("device_id"))
.agg(max(col("device_type")).alias("device_type"),
max(col("platform")).alias("platform"),
udf_mergeInstallList(collect_set(col("install_list"))).alias("install_list"),
udf_mergeExtData(collect_set(concat_ws("#", col("ext_data"), col("update_date"), lit("m")))).alias("ext_data"),
max(col("update_date")).alias("update_date")
)
*/
val rdd = spark.sql(Constant.install_sql.replace("@date", date)).rdd
.map(row => {
val device_id = row.getAs("device_id").toString
......@@ -132,13 +108,6 @@ class DmpDeviceInterest extends CommonSparkJob with Serializable {
jsonArray.add(json)
}
})
/*
if (jsonArray.size() > 0) {
DmInterestTagV2(device_id, device_type, platform, jsonArray.toString, ext_data, update_date)
} else {
null
}
*/
DmInterestTagV2(device_id = device_id, device_type = device_type, platform = platform, install = install_list.asJava.toString,
tags = jsonArray.toString, ext_data = ext_data, update_date = update_date)
})
......@@ -149,42 +118,6 @@ class DmpDeviceInterest extends CommonSparkJob with Serializable {
.option("orc.compress", "snappy")
.orc(output)
// .mapPartitions(v => new CustomIterator(v, mapper = mapper, oldMap = app_tag_map, newMap = bMap))
/*
.map(row => {
val device_id = row.getAs("device_id").toString
val device_type = row.getAs("device_type").toString
val platform = row.getAs("platform").toString
val install_list = row.getAs("install_list").toString
val jsonArray = new JsonArray
val jsonNode = mapper.readTree(install_list).elements()
while (jsonNode.hasNext) {
val node = jsonNode.next()
val date = node.path("date").asText()
val package_name =
if (platform.equals("ios") && node.path("package_name").asText().startsWith("id")) {
node.path("package_name").asText().substring(2)
} else {
node.path("package_name").asText()
}
if (app_tag_map.value.keySet.contains(package_name)) {
val tags = app_tag_map.value(package_name)
val json = new JsonObject
json.addProperty("package_name", package_name)
json.addProperty("date", date)
json.add("tag", GsonUtil.String2JsonArray(tags))
jsonArray.add(json)
}
}
val dmInterestTag = if (jsonArray.size() > 0) {
DmInterestTag(device_id, device_type, platform, jsonArray.toString)
} else {
null
}
dmInterestTag
}).filter(_ != null)
*/
} finally {
if (spark != null) {
sc.stop()
......@@ -204,23 +137,6 @@ class DmpDeviceInterest extends CommonSparkJob with Serializable {
jsonObject.toString
}
def mergeInstallList(installList: mutable.WrappedArray[String]) = {
val installJSONObject = new JSONObject()
installList.iterator.foreach(install => {
val installMap = JSON.parse(install).asInstanceOf[java.util.Map[String, String]].asScala
installMap.foreach(packageInfo => {
if (installJSONObject.containsKey(packageInfo._1)) {
if (installJSONObject.getString(packageInfo._1).compareTo(packageInfo._2) < 0) {
installJSONObject.put(packageInfo._1, packageInfo._2)
}
} else {
installJSONObject.put(packageInfo._1, packageInfo._2)
}
})
})
installJSONObject.toJSONString
}
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("date", true, "[must] date")
......
......@@ -29,7 +29,7 @@ class BundleMatchMain extends BundleMatchNewJob {
*/
def processData(business: String, date: String, input: String, output: String, oldUnmatch: String, unMatchOutput: String, spark: SparkSession,
bundleBC: Broadcast[scala.collection.Map[String, String]], coalesce: Int): DataFrame = {
val expire_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -30), "yyyy-MM-dd")
val expire_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -7), "yyyy-MM-dd")
bmap = bundleBC
import spark.implicits._
val processedDf: DataFrame =
......@@ -49,11 +49,13 @@ class BundleMatchMain extends BundleMatchNewJob {
case "adn_request_sdk" =>
val df = spark.read.orc(oldUnmatch)
.filter(r => {
// val packageName = r.getAs[String]("package_name")
r.getAs[String]("update_date").compareTo(expire_date) > 0
r.getAs[String]("update_date").compareTo(expire_date) > 0 && r.getAs[String]("strategy").contains("MNormalAlphaModelRanker") &&
r.getAs[Int]("dev_tag") == 1
}).toDF()
.union(spark.read.orc(input)
.withColumn("package_name", getPkgName(col("platform"), col("package_name")))
.filter(r => {
r.getAs[String]("strategy").contains("MNormalAlphaModelRanker") && r.getAs[Int]("dev_tag") == 1
}).withColumn("package_name", getPkgName(col("platform"), col("package_name")))
).repartition(coalesce)
df.withColumn("flag", checkBundlePkgName(col("platform"), col("package_name")))
.withColumn("package_name", when(checkBundlePkgName(col("platform"), col("package_name")),
......@@ -179,9 +181,13 @@ class BundleMatchMain extends BundleMatchNewJob {
val checkBundlePkgName = udf((platform: String, package_name: String) => {
var packageName = ""
if (platform.equals("ios") && !package_name.matches(packageRegex)) {
val tmp = package_name.replace("id", "")
if (tmp.matches(packageRegex)) {
if (platform.equals("ios") && !iOSPkgPtn.matcher(package_name).matches()) {
val tmp = if (iOSIDPkgPtn.matcher(package_name).matches()) {
package_name.replace("id", "")
} else {
package_name
}
if (iOSPkgPtn.matcher(tmp).matches()) {
packageName = tmp
} else {
val matchPackage = bmap.value.get(package_name)
......@@ -197,9 +203,13 @@ class BundleMatchMain extends BundleMatchNewJob {
val getBundlePkgName = udf((platform: String, package_name: String) => {
var packageName = package_name
if (platform.equals("ios") && !package_name.matches(packageRegex)) {
val tmp = package_name.replace("id", "")
if (tmp.matches(packageRegex)) {
if (platform.equals("ios") && !iOSPkgPtn.matcher(package_name).matches()) {
val tmp = if (iOSIDPkgPtn.matcher(package_name).matches()) {
package_name.replace("id", "")
} else {
package_name
}
if (iOSPkgPtn.matcher(tmp).matches()) {
packageName = tmp
} else {
val matchPackage = bmap.value.get(package_name)
......
......@@ -23,7 +23,6 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
*/
abstract class BundleMatchNewJob extends CommonSparkJob with Serializable {
val dataSplit = "\t"
val packageRegex = "^[0-9]+$"
def run(args: Array[String]): Int = {
val options = buildOptions()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment