Commit 2c3dc69a by WangJinfeng

fix dmp bug

parent b34b1db5
......@@ -28,12 +28,12 @@ EXPIRE_OUTPUT_PATH="${DMP_INSTALL_LIST}/${expire_date_path}/${business}"
spark-submit --class mobvista.dmp.common.InstallListLogic \
--name "DmpInstallList.${business}.${LOG_TIME}" \
--conf spark.sql.shuffle.partitions=1000 \
--conf spark.default.parallelism=1000 \
--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC " \
--master yarn --deploy-mode cluster --executor-memory 6g --driver-memory 4g --executor-cores 4 --num-executors 50 \
../../${JAR} -date ${LOG_TIME} -business ${business} -output ${OUTPUT} -coalesce 200
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 4g --executor-cores 5 --num-executors 100 \
../../${JAR} -date ${LOG_TIME} -business ${business} -output ${OUTPUT} -coalesce 2000
if [[ $? -ne 0 ]];then
exit 255
......
......@@ -20,12 +20,12 @@ check_await "$OLD_INPUT_PATH/_SUCCESS"
hadoop fs -rm -r "$OUTPUT_PATH/"
spark-submit --class mobvista.dmp.datasource.reyun.ReyunInstallList \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.default.parallelism=2000 \
--conf spark.sql.shuffle.partitions=2000 \
--master yarn --deploy-mode cluster --name ReyunInstallList --executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 30 \
../../${JAR} -input ${INPUT_PATH} -oldInput ${OLD_INPUT_PATH} -output ${OUTPUT_PATH} -date ${dt} -parallelism 500 -coalesce 500
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.default.parallelism=2000 \
--conf spark.sql.shuffle.partitions=2000 \
--master yarn --deploy-mode cluster --name ReyunInstallList --executor-memory 8g --driver-memory 4g --executor-cores 3 --num-executors 100 \
../../${JAR} -input ${INPUT_PATH} -oldInput ${OLD_INPUT_PATH} -output ${OUTPUT_PATH} -date ${dt} -parallelism 500 -coalesce 500
if [ $? -ne 0 ];then
......
......@@ -79,7 +79,7 @@ where year='$old_year'
and update_time<='$SEVEN_DAYS_AGO'
and update_time>='$FOURTEEN_DAYS_AGO'
) t
GROUP BY t.package_name, t.platform LIMIT 100000
GROUP BY t.package_name, t.platform LIMIT 5000
" | grep -v '^[0-9]\{5,7\}\s\+android' > to_crawler_package_name.txt
if [ $? -ne 0 ];then
......@@ -89,7 +89,7 @@ fi
crawl_app_info(){
java -Xms6144m -Xmx6144m -cp ../${JAR} mobvista.dmp.datasource.apptag.crawler.AppInfoCrawler -p \
java -Xms8192m -Xmx8192m -cp ../${JAR} mobvista.dmp.datasource.apptag.crawler.AppInfoCrawler -p \
-f to_crawler_package_name.txt \
-i ios.txt \
-a adr.txt \
......
......@@ -705,8 +705,8 @@ matchBundlePackageV2() {
spark-submit --class ${class} \
--conf spark.network.timeout=720s \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=1000 \
--conf spark.sql.shuffle.partitions=5000 \
--conf spark.default.parallelism=5000 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.sql.files.maxPartitionBytes=134217728 \
--conf spark.sql.adaptive.enabled=true \
......@@ -715,8 +715,8 @@ matchBundlePackageV2() {
--conf spark.storage.memoryFraction=0.4 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--master yarn --deploy-mode cluster --name "BundleMatchMain.${business}" \
--executor-memory 10g --driver-memory 4g --executor-cores 5 --num-executors 60 \
${jar} -business ${business} -date ${date} -input $input_path -output $output_path -bundlePkgPath $bundle_pkg_path -unmatchOutputPath $unmatch_output_path -coalesce 1000
--executor-memory 12g --driver-memory 6g --executor-cores 5 --num-executors 100 \
${jar} -business ${business} -date ${date} -input $input_path -output $output_path -bundlePkgPath $bundle_pkg_path -unmatchOutputPath $unmatch_output_path -coalesce 2000
"
hadoop fs -test -e ${unmatch_input_path}
......
......@@ -8,7 +8,7 @@ yes_bef1_slack=$(date +%Y/%m/%d -d "-1 day $ScheduleTime")
yes_bef2_slack=$(date +%Y/%m/%d -d "-2 day $ScheduleTime")
check_await ${DMP_INSTALL_LIST}/${yes_bef1_slack}/adn_request_sdk/_SUCCESS
check_await ${DMP_INSTALL_LIST}/${yes_bef1_slack}/reyun/_SUCCESS
# check_await ${DMP_INSTALL_LIST}/${yes_bef1_slack}/reyun/_SUCCESS
check_await ${DMP_INSTALL_LIST}/${yes_bef1_slack}/dsp_req/_SUCCESS
check_await ${DMP_INSTALL_LIST}/${yes_bef2_slack}/other/_SUCCESS
check_await ${DMP_INSTALL_LIST}/${yes_bef1_slack}/adn_install/_SUCCESS
......
......@@ -2,7 +2,9 @@ package mobvista.dmp.datasource.setting;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import mobvista.dmp.common.Constants;
import mobvista.dmp.util.MRUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
......@@ -21,7 +23,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;
/**
* author: houying
......@@ -97,8 +98,10 @@ public class NginxSettingMR extends Configured implements Tool {
if (split.length < 2) {
return;
}
JSONObject jsonObject = JSONObject.parseObject(split[1]);
uri = jsonObject.getString("req_info");
JSONObject jsonObject = Constants.String2JSONObject(split[1]);
if (jsonObject.containsKey("req_info")) {
uri = jsonObject.getString("req_info");
}
}else{
int start = line.indexOf("GET ");
if (start < 0) {
......@@ -115,7 +118,7 @@ public class NginxSettingMR extends Configured implements Tool {
}
uri = line.substring(start, end);
}
if (!uri.startsWith("/setting?") && !uri.startsWith("/appwall/setting?")) {
if (StringUtils.isBlank(uri) && !uri.startsWith("/setting?") && !uri.startsWith("/appwall/setting?")) {
return;
}
uri = uri.substring(uri.indexOf("?") + 1);
......
......@@ -107,7 +107,7 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable {
var sql: String =
"""
|SELECT device_id, MAX(device_type) device_type, MAX(platform) platform, MAX(country) country, merge(COLLECT_SET(install_list)) install_list,
| udf_mergeExtData(COLLECT_SET(CONCAT_WS('#', ext_data, update_date, business))) ext_data, MAX(update_date) update_date
| udf_mergeExtData(COLLECT_SET(CONCAT_WS('#', ext_data, update_date, business))) ext_data, CONCAT_WS(',',COLLECT_SET(business)) merge_bus, MAX(update_date) update_date
| FROM
| (SELECT LOWER(device_id) device_id,device_type,platform,country,install_list,ext_data,update_date,business
| FROM dwh.dmp_install_list d LEFT SEMI JOIN active_dev a ON LOWER(d.device_id) = a.device_id
......
......@@ -29,7 +29,7 @@ class BundleMatchMain extends BundleMatchNewJob {
*/
def processData(business: String, date: String, input: String, output: String, oldUnmatch: String, unMatchOutput: String, spark: SparkSession,
bundleBC: Broadcast[scala.collection.Map[String, String]], coalesce: Int): DataFrame = {
val expire_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -365), "yyyy-MM-dd")
val expire_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -30), "yyyy-MM-dd")
bmap = bundleBC
import spark.implicits._
val processedDf: DataFrame =
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment