Commit 096777fe by wang-jinfeng

optimize dmp

parent c2b77e2e
type=command
dependencies=mapping_aliyun,mapping_fk,mapping_se,mapping_sg
dependencies=mapping_filter
command=echo "MappingServer BackFlow Success !!!"
\ No newline at end of file
......@@ -12,28 +12,27 @@ schedule_time=$(date +"%Y-%m-%d %H:%M:%S" -d "${hour} hour $ScheduleTime")
final_schedule_time=$(date +%s -d "$schedule_time")
currtime=`date '+%s'`
while [[ true ]];
do
if [ ${currtime} -gt ${final_schedule_time} ];then
break
fi
sleep 300
currtime=`date '+%s'`
currtime=$(date '+%s')
while [[ true ]]; do
if [ ${currtime} -gt ${final_schedule_time} ]; then
break
fi
sleep 300
currtime=$(date '+%s')
done
# 回流备份开始时间
backflow_start_time=$(date +%Y%m%d -d "-1 day $ScheduleTime")
# 回流备份开始时间戳 * 1000000,Cassandra 中 writeTime 为16位
writetime_start=`expr $(date +%s -d "$backflow_start_time") \* 1000000`
writetime_start=$(expr $(date +%s -d "$backflow_start_time") \* 1000000)
# 回流备份结束时间
backflow_end_time=$(date +%Y%m%d -d "$ScheduleTime")
# 回流备份结束时间戳 * 1000000
writetime_end=`expr $(date +%s -d "$backflow_end_time") \* 1000000`
writetime_end=$(expr $(date +%s -d "$backflow_end_time") \* 1000000)
# package name
system="mapping"
......@@ -49,6 +48,12 @@ value_column="devid_value"
date_path=$(date +"%Y/%m/%d" -d "-1 day ${ScheduleTime}")
date=$(date +%Y%m%d -d "-1 day ${ScheduleTime}")
year=${date:0:4}
month=${date:4:2}
day=${date:6:2}
# 备份 s3 地址
output="${BACKFLOW_OUTPUT}/${keyspace}/${table}/${date_path}/${region}/"
......@@ -67,3 +72,9 @@ spark-submit --class mobvista.dmp.datasource.backflow.BackFlow \
if [[ $? -ne 0 ]]; then
exit 255
fi
common_mount_partition "default" "mapping_dump_table_shulun" "year='${year}', month='${month}', day='${day}', region='${region}'" "${output}"
if [[ $? -ne 0 ]]; then
exit 255
fi
type=command
dependencies=mapping_aliyun,mapping_fk,mapping_se,mapping_sg
command=bash -x mapping_filter.sh
\ No newline at end of file
#!/bin/bash
source ../../dmp_env.sh
date_path=$(date +"%Y/%m/%d" -d "-1 day ${ScheduleTime}")
date=$(date +%Y%m%d -d "-1 day ${ScheduleTime}")
# keyspace name
keyspace="mapping"
# table name
table="mapping_server"
output="${BACKFLOW_OUTPUT}/${keyspace}/${table}_filter/${date_path}"
spark-submit --class mobvista.dmp.datasource.backflow.BackFlowFilter \
--name "BackFlowFilter.${keyspace}.${table}" \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.default.parallelism=2000 \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \
--master yarn --deploy-mode cluster \
--executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 20 \
../.././DMP.jar \
-output ${output} -date ${date}
if [[ $? -ne 0 ]]; then
exit 255
fi
......@@ -22,7 +22,7 @@ DSP_REQ_INSTALL_LIST_PATH="${DMP_INSTALL_LIST}/$date_path/dsp_req"
CLEVER_INSTALL_LIST_PATH="${DMP_INSTALL_LIST}/$date_path/clever"
BYTEDANCE_INSTALL_LIST_PATH="${DMP_INSTALL_LIST}/$date_path/bytedance"
FACEBOOK_INSTALL_LIST_PATH="${DMP_INSTALL_LIST}/$date_path/facebook"
JOYPAC_INSTALL_LIST_PATH="${DMP_INSTALL_LIST}/$date_path/joypacios"
# JOYPAC_INSTALL_LIST_PATH="${DMP_INSTALL_LIST}/$date_path/joypacios"
MP_INSTALL_LIST_PATH="${DMP_INSTALL_LIST}/$date_path/mp"
check_await ${SSS_INSTALL_LIST_PATH}/_SUCCESS
......@@ -34,7 +34,7 @@ check_await ${DSP_REQ_INSTALL_LIST_PATH}/_SUCCESS
check_await ${CLEVER_INSTALL_LIST_PATH}/_SUCCESS
check_await ${BYTEDANCE_INSTALL_LIST_PATH}/_SUCCESS
check_await ${FACEBOOK_INSTALL_LIST_PATH}/_SUCCESS
check_await ${JOYPAC_INSTALL_LIST_PATH}/_SUCCESS
# check_await ${JOYPAC_INSTALL_LIST_PATH}/_SUCCESS
check_await ${MP_INSTALL_LIST_PATH}/_SUCCESS
BUSINESS="14days"
......
type=command
command=sh -x resource_monitor_ck.sh
\ No newline at end of file
#!/bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# @author :wangjf
# @revision:2021-05-28 15:31:20
# # # # # # # # # # # # # # # # # # # # # #
source ../dmp_env.sh
today=${ScheduleTime:-$1}
date=$(date +"%Y%m%d" -d "-1 hour $today")
java -cp ../${JAR} mobvista.dmp.datasource.monitor.ResourceMonitor ${date}
if [[ $? -ne 0 ]]; then
exit 255
fi
......@@ -67,8 +67,6 @@ public class ClickHouseJdbc {
properties.setSocketTimeout(timeout);
properties.setConnectionTimeout(timeout);
// Random random = new Random();
// int shard = random.nextInt(3);
int shard = times % 3;
String[] ips = set_values[shard].split(":");
ClickHouseDataSource dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties);
......@@ -77,7 +75,6 @@ public class ClickHouseJdbc {
connection = dataSource.getConnection();
} catch (ClickHouseException e) {
Thread.sleep(100);
// ipId = random.nextInt(3);
ips = set_values[shard].split(":");
dataSource = new ClickHouseDataSource(url.replace("host", ips[new Random().nextInt(2)]), properties);
connection = dataSource.getConnection();
......
package mobvista.dmp.datasource.monitor;
import ch.qos.logback.core.joran.spi.JoranException;
import mobvista.dmp.common.ClickHouseJdbc;
import mobvista.dmp.util.DateUtil;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHousePreparedStatement;
import ru.yandex.clickhouse.except.ClickHouseException;
import java.sql.SQLException;
import java.util.Random;
/**
* @package: mobvista.dmp.datasource.monitor
* @author: wangjf
* @date: 2019-08-31
* @time: 07:49
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
public class ResourceMonitor {
public static void main(String[] args) throws JoranException {
String date = "";
String part = "";
if (args.length >= 1) {
part = args[0];
date = DateUtil.format(DateUtil.parse(part, "yyyyMMdd"), "yyyy-MM-dd");
} else {
System.exit(1);
}
long start = System.currentTimeMillis();
Random random = new Random();
int selectIp = random.nextInt(3);
ClickHouseConnection connection = null;
ClickHousePreparedStatement preparedStatement = null;
ClickHousePreparedStatement preparedStatementDrop = null;
try {
try {
connection = ClickHouseJdbc.connectionByTime(selectIp);
} catch (ClickHouseException e) {
System.exit(255);
}
assert connection != null;
// drop partition
preparedStatementDrop = (ClickHousePreparedStatement) connection.prepareStatement(dropPartitionSql(part));
preparedStatementDrop.execute();
Thread.sleep(120000);
// insert data
preparedStatement = (ClickHousePreparedStatement) connection.prepareStatement(insertDataSql(date));
preparedStatement.execute();
} catch (SQLException | InterruptedException e) {
System.exit(255);
} finally {
try {
connection.close();
preparedStatement.close();
preparedStatementDrop.close();
System.exit(0);
} catch (SQLException throwables) {
System.exit(255);
}
}
}
private static String insertDataSql(String date) {
String insertSql = "INSERT INTO dataplatform_job_meta.yarn_application_metric_all(application_time, master_name, master_address, user_name,\n" +
" queue_name, application_id,\n" +
" application_name, application_type, application_status,\n" +
" application_core, application_memory,\n" +
" application_start_time, application_stop_time,\n" +
" application_elapsed_time, application_read_byte,\n" +
" application_write_byte, application_input_path,\n" +
" application_output_path, email, direct_leader, department)\n" +
"SELECT toUInt64(application_time) application_time,\n" +
" master_name,\n" +
" master_address,\n" +
" user_name,\n" +
" queue_name,\n" +
" toUInt64(replaceAll(replaceAll(application_id, 'application', ''), '_', '')) application_id,\n" +
" application_name,\n" +
" application_type,\n" +
" application_status,\n" +
" application_core,\n" +
" application_memory,\n" +
" application_start_time,\n" +
" application_stop_time,\n" +
" application_elapsed_time,\n" +
" application_read_byte,\n" +
" application_write_byte,\n" +
" application_input_path,\n" +
" application_output_path,\n" +
" email,\n" +
" direct_leader,\n" +
" department\n" +
"FROM dataplatform_job_meta.yarn_application_msg\n" +
"WHERE toDate(application_time / 1000) = '@date'";
return insertSql.replace("@date", date);
}
private static String dropPartitionSql(String part) {
String dropSql = "ALTER TABLE dataplatform_job_meta.yarn_application_metric ON CLUSTER cluster_1st DROP PARTITION @part";
return dropSql.replace("@part", part);
}
}
......@@ -17,6 +17,7 @@ http.server.appSecret=fe7d464abab8c4e57ea426916b974f22
# http.server.ip=192.168.17.124,192.168.17.123,192.168.17.122,192.168.17.145,192.168.17.146,192.168.17.147
http.private.server.ip=ip-172-31-20-35.ec2.internal:ip-172-31-29-16.ec2.internal,ip-172-31-31-111.ec2.internal:ip-172-31-23-17.ec2.internal,ip-172-31-24-21.ec2.internal:ip-172-31-18-160.ec2.internal
http.private.host.server.ip=172.31.20.35:172.31.29.16,172.31.31.111:172.31.23.17,172.31.24.21:172.31.18.160
# http.private.host.server.ip=107.21.133.155:3.84.137.245,34.228.113.34:34.230.17.80,52.91.138.142:54.164.57.219
http.server.ip=107.21.133.155:3.84.137.245,34.228.113.34:34.230.17.80,52.91.138.142:54.164.57.219
http.local.server.ip=localhost:127.0.0.1,localhost:127.0.0.1,localhost:127.0.0.1
dsp.taobao.server.url=https://bombonera-dsp.taobao.com/cpm/ask
......@@ -116,4 +117,4 @@ tencent.package_name=com.tencent.news_bes,com.tencent.news_bes_7,com.tencent.new
youku_acquisition.package_name=com.youku.foracquisition_imei,com.youku.foracquisition_oaid
rtdmp.stop.audience=433,406,405,407,390,395,389,123,238,388
\ No newline at end of file
rtdmp.stop.audience=433,406,405,407,390,395,389,123,238,388,1133,1134,1135
\ No newline at end of file
package mobvista.dmp.datasource.backflow
/**
* @package: mobvista.dmp.datasource.backflow
* @author: wangjf
* @date: 2021/6/1
* @time: 5:47 下午
* @email: jinfeng.wang@mobvista.com
*/
object BackFlowConstant {
val mapping_sql =
"""
|SELECT region, devid_key, devid_type, devid_value
| FROM
| (SELECT region, devid_key, devid_type, devid_value1 AS devid_value
| FROM
| (SELECT t1.region as region, devid_key, devid_type, t1.devid_value AS devid_value1, t2.devid_value AS devid_value2
| FROM
| (SELECT region, devid_key, devid_type, devid_value
| FROM default.mapping_dump_table_shulun
| WHERE concat(year,month,day) = '@dt'
| ) t1
| LEFT OUTER JOIN
| (SELECT region, devid_value
| FROM
| (SELECT region, devid_value, count(1) as num
| FROM default.mapping_dump_table_shulun
| WHERE concat(year,month,day) = '@dt'
| GROUP BY region, devid_value
| ) tmp
| WHERE num > 100
| ) t2
| ON t1.region = t2.region AND t1.devid_value = t2.devid_value
| ) t3
| WHERE devid_value2 IS NOT NULL
| UNION ALL
| SELECT region, devid_key, devid_type, devid_value
| FROM default.mapping_dump_table_shulun
| WHERE concat(year,month,day) = '@dt' AND (LENGTH(devid_value) < 16 OR LENGTH(devid_key) < 15)
| ) t
|""".stripMargin
}
package mobvista.dmp.datasource.joypac
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession, _};
/**
* 刘凯 2019-02-18 15:20
* joypc_sdk fluentd数据接入至etl_joypc_sdk_daily表
......@@ -8,7 +10,7 @@ import org.apache.spark.sql.{SparkSession, _};
object JoypcSdkDaily extends Serializable {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.enableHiveSupport()
.enableHiveSupport()
.getOrCreate()
val loadTime = spark.conf.get("spark.app.loadTime")
var year = loadTime.substring(0, 4)
......@@ -22,21 +24,36 @@ object JoypcSdkDaily extends Serializable {
val filter_rdd = log_rdd.filter(_.length != 1).map(p => {
val etl_json = JoypcSdkTools.getEtlJSON(p)
val id = etl_json.get("id");
val idfa = etl_json.get("idfa");;
val app_version = etl_json.get("app_version");;
val brand = etl_json.get("brand");;
val network_type = etl_json.get("network_type");;
val package_name = etl_json.get("package_name");;
val platform = etl_json.get("platform");;
val language = etl_json.get("language");;
val business_name = etl_json.get("business_name");;
val apps_info = etl_json.get("apps_info");;
val business_pass = etl_json.get("business_pass");;
val os_version = etl_json.get("os_version");;
val app_version_code = etl_json.get("app_version_code");;
val model = etl_json.get("model");;
val time_zone = etl_json.get("time_zone");;
val time = etl_json.get("time");;
val idfa = etl_json.get("idfa");
;
val app_version = etl_json.get("app_version");
;
val brand = etl_json.get("brand");
;
val network_type = etl_json.get("network_type");
;
val package_name = etl_json.get("package_name");
;
val platform = etl_json.get("platform");
;
val language = etl_json.get("language");
;
val business_name = etl_json.get("business_name");
;
val apps_info = etl_json.get("apps_info");
;
val business_pass = etl_json.get("business_pass");
;
val os_version = etl_json.get("os_version");
;
val app_version_code = etl_json.get("app_version_code");
;
val model = etl_json.get("model");
;
val time_zone = etl_json.get("time_zone");
;
val time = etl_json.get("time");
;
Row(
business_name,
business_pass,
......@@ -55,16 +72,15 @@ object JoypcSdkDaily extends Serializable {
apps_info,
time)
})
.filter { x =>
{
val business_name = x.getString(0)
val business_pass = x.getString(1)
val result = business_name.equals("joypac_ios") && business_pass.equals("joypac_ios-sdk0121")
result
}
.filter { x => {
val business_name = x.getString(0)
val business_pass = x.getString(1)
val result = business_name.equals("joypac_ios") && business_pass.equals("joypac_ios-sdk0121")
result
}
val cal_rdd = filter_rdd.map { p => parseCalData(p) }
}
val cal_rdd = filter_rdd.map { p => parseCalData(p) }
val joypc_schema = StructType(Array(
StructField("id", StringType),
StructField("idfa", StringType),
......@@ -90,9 +106,9 @@ object JoypcSdkDaily extends Serializable {
}
spark.sparkContext.stop()
}
def parseCalData(row: Row) = {
def parseCalData(row: Row) = {
Row(
row(2),
row(3),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment