Commit fcb21923 by WangJinfeng

update rtdmp_merge_ck.sh user_info_ck.sh

parent 118555b8
...@@ -26,14 +26,14 @@ table="ods_user_info" ...@@ -26,14 +26,14 @@ table="ods_user_info"
# --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ # --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
spark-submit --class mobvista.dmp.clickhouse.feature.UserInfo \ spark-submit --class mobvista.dmp.clickhouse.feature.UserInfo \
--name "UserInfo_wangjf_${date}" \ --name "UserInfo_wangjf_${date}" \
--conf spark.sql.shuffle.partitions=8000 \ --conf spark.sql.shuffle.partitions=10000 \
--conf spark.default.parallelism=8000 \ --conf spark.default.parallelism=10000 \
--conf spark.sql.files.maxPartitionBytes=134217728 \ --conf spark.sql.files.maxPartitionBytes=134217728 \
--conf spark.kryoserializer.buffer.max=512m \ --conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \ --conf spark.kryoserializer.buffer=64m \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--files ${HIVE_SITE_PATH} \ --files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 5 --num-executors 12 \ --master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 4 --num-executors 10 \
../../${JAR} -date ${date} -host ${host} -cluster ${cluster} -database ${database} -table ${table} ../../${JAR} -date ${date} -host ${host} -cluster ${cluster} -database ${database} -table ${table}
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
......
...@@ -20,7 +20,7 @@ spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMergeCK \ ...@@ -20,7 +20,7 @@ spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMergeCK \
--conf spark.speculation.quantile=0.9 \ --conf spark.speculation.quantile=0.9 \
--conf spark.speculation.multiplier=1.3 \ --conf spark.speculation.multiplier=1.3 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 6 --num-executors 10 \ --master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 5 --num-executors 10 \
../${JAR} -date_time "${date_time}" -host ${host} -cluster ${cluster} -database ${database} -table ${table} ../${JAR} -date_time "${date_time}" -host ${host} -cluster ${cluster} -database ${database} -table ${table}
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
......
...@@ -139,13 +139,12 @@ class UserInfo extends Serializable { ...@@ -139,13 +139,12 @@ class UserInfo extends Serializable {
r.getAs("update_date"), r.getAs("publish_date")) r.getAs("update_date"), r.getAs("publish_date"))
}).toDF }).toDF
Thread.sleep(120000)
/** /**
* user_info save * user_info save
*/ */
// userDF.createClickHouseDb(database, clusterName) // userDF.createClickHouseDb(database, clusterName)
// userDF.createClickHouseTable(database, table, Seq("dt"), Constant.indexColumn, Constant.orderColumn, clusterName) // userDF.createClickHouseTable(database, table, Seq("dt"), Constant.indexColumn, Constant.orderColumn, clusterName)
userDF.saveToClickHouse(database, table, Seq(update_date), Seq("dt"), clusterName, batchSize = 200000) userDF.saveToClickHouse(database, table, Seq(update_date), Seq("dt"), clusterName, batchSize = 100000)
MySQLUtil.update(database, table, date) MySQLUtil.update(database, table, date)
// val lastDate = DateUtil.getDayByString(date, "yyyyMMdd", -1) // val lastDate = DateUtil.getDayByString(date, "yyyyMMdd", -1)
......
...@@ -73,7 +73,7 @@ class RTDmpMain extends CommonSparkJob with Serializable { ...@@ -73,7 +73,7 @@ class RTDmpMain extends CommonSparkJob with Serializable {
val calendar = Calendar.getInstance() val calendar = Calendar.getInstance()
val date = sdf.parse(datetime) val date = sdf.parse(datetime)
calendar.setTime(date) calendar.setTime(date)
calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) - 24) calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) - 48)
val expire_time = sdf.format(calendar.getTime) val expire_time = sdf.format(calendar.getTime)
val hour_rdd = spark.read.orc(input).rdd.map(row => { val hour_rdd = spark.read.orc(input).rdd.map(row => {
......
...@@ -80,7 +80,7 @@ class RTDmpMergeCK extends CommonSparkJob with Serializable { ...@@ -80,7 +80,7 @@ class RTDmpMergeCK extends CommonSparkJob with Serializable {
Thread.sleep(120000) Thread.sleep(120000)
df.saveToClickHouse(database, table, Seq(dt, hour_part), Seq("dt", "hour"), clusterName, batchSize = 200000) df.saveToClickHouse(database, table, Seq(dt, hour_part), Seq("dt", "hour"), clusterName, batchSize = 100000)
MySQLUtil.update(database, table, date_time) MySQLUtil.update(database, table, date_time)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment