Commit c7b71508 by WangJinfeng

update rtdmp into dmp.table_info

parent 0087a692
...@@ -71,9 +71,10 @@ public class MySQLUtil { ...@@ -71,9 +71,10 @@ public class MySQLUtil {
conn = DriverManager.getConnection(DB_URL, USER, PASS); conn = DriverManager.getConnection(DB_URL, USER, PASS);
// update last partition // update last partition
String sql = "REPLACE INTO table_info(db_name, tb_name, part, flag) VALUES('" + dbName + "','" + tbName + "','" + partition + "', true)"; String sql = "REPLACE INTO table_info(db_name, tb_name, part, flag) VALUES('" + dbName + "','" + tbName + "','" + partition + "', true)";
String lastPart = DateUtil.getDayByString(partition, "yyyyMMdd", -1); // String lastPart = DateUtil.getDayByString(partition, "yyyyMMdd", -1);
String lastSql = "REPLACE INTO table_info(db_name, tb_name, part, flag) VALUES('" + dbName + "','" + tbName + "','" + lastPart + "', false)"; // String lastSql = "REPLACE INTO table_info(db_name, tb_name, part, flag) VALUES('" + dbName + "','" + tbName + "','" + lastPart + "', false)";
if (conn.prepareStatement(sql).executeUpdate() + conn.prepareStatement(lastSql).executeUpdate() == 2) { // + conn.prepareStatement(lastSql).executeUpdate() == 2
if (conn.prepareStatement(sql).executeUpdate() == 1) {
flag = true; flag = true;
} else { } else {
flag = false; flag = false;
...@@ -94,7 +95,7 @@ public class MySQLUtil { ...@@ -94,7 +95,7 @@ public class MySQLUtil {
} }
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(getLastPartition("dwh", "ods_user_info")); System.out.println(getLastPartition("dwh", "audience_merge"));
System.out.println(update("dwh", "ods_user_info", "20191126")); System.out.println(update("dwh", "audience_merge", "2021072913"));
} }
} }
\ No newline at end of file
...@@ -2,7 +2,7 @@ package mobvista.dmp.datasource.rtdmp ...@@ -2,7 +2,7 @@ package mobvista.dmp.datasource.rtdmp
import com.alibaba.fastjson.JSON import com.alibaba.fastjson.JSON
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.DateUtil import mobvista.dmp.util.{DateUtil, MySQLUtil}
import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory
import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._ import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
...@@ -82,6 +82,8 @@ class RTDmpMergeCK extends CommonSparkJob with Serializable { ...@@ -82,6 +82,8 @@ class RTDmpMergeCK extends CommonSparkJob with Serializable {
df.saveToClickHouse(database, table, Seq(dt, hour_part), Seq("dt", "hour"), clusterName, batchSize = 200000) df.saveToClickHouse(database, table, Seq(dt, hour_part), Seq("dt", "hour"), clusterName, batchSize = 200000)
MySQLUtil.update(database, table, date_time)
} finally { } finally {
if (spark != null) { if (spark != null) {
spark.stop() spark.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment