Commit bcad12be by WangJinfeng

update lazada_rtdmp,fix RTDmpFetch

parent 9d586698
...@@ -121,11 +121,11 @@ public class RTDmpFetch { ...@@ -121,11 +121,11 @@ public class RTDmpFetch {
LOGGER.info("checkRules -->> audienceId:" + audienceId + ", jsonObject:" + jsonObject + ", startTime:" + startTime + ", endTime:" + endTime); LOGGER.info("checkRules -->> audienceId:" + audienceId + ", jsonObject:" + jsonObject + ", startTime:" + startTime + ", endTime:" + endTime);
Tuple tuple = checkRules(jsonObject, startTime, endTime); Tuple tuple = checkRules(jsonObject, startTime, endTime);
if (tuple.getFlag()) { if (tuple.getFlag()) {
KV kv = mySqlUtil.getPartitionTime("dwh", " audience_merge_v1"); KV kv = mySqlUtil.getPartitionTime("dwh", "audience_merge_v1");
// retry getPartitionTime // retry getPartitionTime
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
if (StringUtils.isBlank(kv.getK())) { if (StringUtils.isBlank(kv.getK())) {
kv = mySqlUtil.getPartitionTime("dwh", " audience_merge_v1"); kv = mySqlUtil.getPartitionTime("dwh", "audience_merge_v1");
} else { } else {
break; break;
} }
......
...@@ -78,8 +78,11 @@ public class MySQLUtil { ...@@ -78,8 +78,11 @@ public class MySQLUtil {
stmt = conn.createStatement(); stmt = conn.createStatement();
String sql; String sql;
// last partition // last partition
sql = "SELECT part,utime FROM table_info WHERE db_name = '" + dbName + "' AND tb_name = '" + tbName + "' AND flag = 1 ORDER BY part DESC LIMIT 1"; sql = "SELECT part,utime FROM dmp.table_info WHERE db_name = '" + dbName + "' AND tb_name = '" + tbName + "' AND flag = 1 ORDER BY part DESC LIMIT 1";
ResultSet rs = stmt.executeQuery(sql); ResultSet rs = stmt.executeQuery(sql);
if (!rs.next()) {
System.out.println("getPartitionTime Null,SQL --->>> " + sql);
}
while (rs.next()) { while (rs.next()) {
partition = rs.getString("part"); partition = rs.getString("part");
utime = rs.getTimestamp("utime"); utime = rs.getTimestamp("utime");
......
...@@ -4,8 +4,10 @@ import com.alibaba.fastjson.{JSONArray, JSONObject} ...@@ -4,8 +4,10 @@ import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.{Logic, ServerUtil} import mobvista.dmp.datasource.rtdmp.{Logic, ServerUtil}
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.io.compress.GzipCodec
import java.net.URI
import scala.collection.mutable import scala.collection.mutable
/** /**
...@@ -42,6 +44,8 @@ class ProcessRTJob extends CommonSparkJob with Serializable { ...@@ -42,6 +44,8 @@ class ProcessRTJob extends CommonSparkJob with Serializable {
val sql: String = Constant.process_rtdmp_audience_sql val sql: String = Constant.process_rtdmp_audience_sql
.replace("@dt", dt) .replace("@dt", dt)
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
spark.sql(sql) spark.sql(sql)
.rdd.map(row => { .rdd.map(row => {
row.getAs[String]("gaid") row.getAs[String]("gaid")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment