Commit 85aa3370 by WangJinfeng

update ga bucket, rtdmp bug fix

parent 7a8e01b8
......@@ -15,7 +15,8 @@ HIVE_CMD=$(hive_func)
$HIVE_CMD -v -hivevar prefix=$date_path -hivevar date_str_undline=$date_str_undline -hivevar date_str_midline=$date_str_midline -f pre_create_tables.sql
INPUT_PATH="s3://live-ga-rawdata-annotated/${date_path}"
# INPUT_PATH="s3://live-ga-rawdata-annotated/${date_path}"
INPUT_PATH="s3://ga-annotated-data/${date_path}"
OUTPUT_PATH="${GA_ODS_RAW_DATA_PATH}/${date_path}"
spark-submit --class mobvista.dmp.datasource.ga.GaParser \
......
......@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import mobvista.dmp.common.Constants;
import mobvista.dmp.datasource.rtdmp.entity.KV;
import mobvista.dmp.datasource.rtdmp.entity.Tuple;
import mobvista.dmp.util.*;
import org.apache.commons.lang3.StringUtils;
......@@ -120,9 +121,11 @@ public class RTDmpFetch {
LOGGER.info("checkRules -->> audienceId:" + audienceId + ", jsonObject:" + jsonObject + ", startTime:" + startTime + ", endTime:" + endTime);
Tuple tuple = checkRules(jsonObject, startTime, endTime);
if (tuple.getFlag()) {
String partition = mySqlUtil.getLastPartition("dwh", "audience_merge");
String dt = DateUtil.format(DateUtil.parse(partition.substring(0, 8), "yyyyMMdd"), "yyyy-MM-dd");
String hour = partition.substring(8, 10);
KV kv = mySqlUtil.getPartitionTime("dwh", "audience_merge");
String dt = DateUtil.format(DateUtil.parse(kv.getK().substring(0, 8), "yyyyMMdd"), "yyyy-MM-dd");
String hour = kv.getK().substring(8, 10);
// time of insert partition
String utime = kv.getV();
String partTime = dt + " " + hour;
......@@ -141,14 +144,16 @@ public class RTDmpFetch {
} catch (InterruptedException e) {
LOGGER.info(e.getMessage());
}
partition = mySqlUtil.getLastPartition("dwh", "audience_merge");
dt = DateUtil.format(DateUtil.parse(partition.substring(0, 8), "yyyyMMdd"), "yyyy-MM-dd");
hour = partition.substring(8, 10);
kv = mySqlUtil.getPartitionTime("dwh", "audience_merge");
dt = DateUtil.format(DateUtil.parse(kv.getK().substring(0, 8), "yyyyMMdd"), "yyyy-MM-dd");
hour = kv.getK().substring(8, 10);
utime = kv.getV();
partTime = dt + " " + hour;
}
long nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000;
if (tuple.getUtime() > (nowTime - 7200) && nowTime < endTime) {
if (Long.parseLong(utime) > (nowTime - 600) && nowTime < endTime) {
// 避免后台分片同步导致数据缺失
try {
LOGGER.info("Sleep 10 min.");
......
package mobvista.dmp.datasource.rtdmp.entity;
/**
* @package: mobvista.dmp.datasource.rtdmp.entity
* @author: wangjf
* @date: 2021/8/17
* @time: 1:49 下午
* @email: jinfeng.wang@mobvista.com
*/
public class KV {
private String k;
private String v;
public KV(String k, String v) {
this.k = k;
this.v = v;
}
public String getK() {
return k;
}
public void setK(String k) {
this.k = k;
}
public String getV() {
return v;
}
public void setV(String v) {
this.v = v;
}
}
package mobvista.dmp.util;
import mobvista.dmp.datasource.rtdmp.entity.KV;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @package: mobvista.dmp.util
......@@ -62,6 +65,48 @@ public class MySQLUtil {
return partition;
}
public static KV getPartitionTime(String dbName, String tbName) {
Connection conn = null;
Statement stmt = null;
String partition = "";
Timestamp utime = new Timestamp(0L);
try {
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(DB_URL, USER, PASS);
stmt = conn.createStatement();
String sql;
// last partition
sql = "SELECT part,utime FROM table_info WHERE db_name = '" + dbName + "' AND tb_name = '" + tbName + "' AND flag = 1 ORDER BY part DESC LIMIT 1";
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
partition = rs.getString("part");
utime = rs.getTimestamp("utime");
}
rs.close();
stmt.close();
conn.close();
} catch (Exception se) {
se.printStackTrace();
} finally {
try {
if (stmt != null) {
stmt.close();
}
} catch (SQLException ignored) {
try {
if (conn != null) {
conn.close();
}
} catch (SQLException se) {
se.printStackTrace();
}
}
}
return new KV(partition, String.valueOf(utime.getTime() / 1000 + 28800));
}
public static boolean update(String dbName, String tbName, String partition) {
Connection conn = null;
boolean flag = false;
......@@ -95,7 +140,13 @@ public class MySQLUtil {
}
public static void main(String[] args) {
System.out.println(getLastPartition("dwh", "audience_merge"));
System.out.println(update("dwh", "audience_merge", "2021072913"));
KV kv = getPartitionTime("dwh", "audience_merge");
long nowTime = DateUtil.parse(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd HH:mm:ss").getTime() / 1000;
System.out.println(kv.getV());
System.out.println(Long.parseLong(kv.getV()) > nowTime - 1200);
// System.out.println(getLastPartition("dwh", "audience_merge"));
// System.out.println(update("dwh", "audience_merge", "2021072913"));
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment