Commit 7e2345ff by WangJinfeng

fix mapping,AdnSdkTools,rtdmp_tmp,DmpDeviceInterest

parent 29841326
...@@ -12,15 +12,15 @@ check_await "${DM_INSTALL_LIST}_v2/${dt_slash_today}/dsp_req/_SUCCESS" ...@@ -12,15 +12,15 @@ check_await "${DM_INSTALL_LIST}_v2/${dt_slash_today}/dsp_req/_SUCCESS"
hadoop fs -rm -r "${OUTPUT_PATH}" hadoop fs -rm -r "${OUTPUT_PATH}"
spark-submit --class mobvista.dmp.datasource.taobao.EtlComTencentNewsDaily \ spark-submit --class mobvista.dmp.datasource.taobao.EtlComTencentNewsDaily \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--conf spark.default.parallelism=3000 \ --conf spark.default.parallelism=3000 \
--conf spark.sql.shuffle.partitions=3000 \ --conf spark.sql.shuffle.partitions=3000 \
--conf spark.sql.broadcastTimeout=1200 \ --conf spark.sql.broadcastTimeout=1200 \
--conf spark.sql.autoBroadcastJoinThreshold=31457280 \ --conf spark.sql.autoBroadcastJoinThreshold=31457280 \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 120 \ --master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 80 \
../../${JAR} \ ../../${JAR} \
-output ${OUTPUT_PATH} -coalesce 500 \ -output ${OUTPUT_PATH} -coalesce 500 \
-dt_today ${dt_today} -dt_dash_rec7day ${dt_dash_rec7day} -dt_dash_rec15day ${dt_dash_rec15day} -dt_today ${dt_today} -dt_dash_rec7day ${dt_dash_rec7day} -dt_dash_rec15day ${dt_dash_rec15day}
if [ $? -ne 0 ];then if [ $? -ne 0 ];then
......
...@@ -57,15 +57,18 @@ day=${date:6:2} ...@@ -57,15 +57,18 @@ day=${date:6:2}
# 备份 s3 地址 # 备份 s3 地址
output="${BACKFLOW_OUTPUT}/${keyspace}/${table}/${date_path}/${region}/" output="${BACKFLOW_OUTPUT}/${keyspace}/${table}/${date_path}/${region}/"
export SPARK_HOME="/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2"
export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf"
spark-submit --class mobvista.dmp.datasource.backflow.BackFlow \ spark-submit --class mobvista.dmp.datasource.backflow.BackFlow \
--name "BackFlow.${keyspace}.${table}.${region}" \ --name "BackFlow.${keyspace}.${table}.${region}" \
--conf spark.sql.shuffle.partitions=2000 \ --conf spark.sql.shuffle.partitions=2000 \
--conf spark.default.parallelism=2000 \ --conf spark.default.parallelism=2000 \
--conf spark.kryoserializer.buffer.max=512m \ --conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \ --conf spark.kryoserializer.buffer=64m \
--conf spark.kubernetes.container.image=818539432014.dkr.ecr.us-east-1.amazonaws.com/engineplus/spark:3.0.1-mobvista-v1.2.2 \
--master yarn --deploy-mode cluster \ --master yarn --deploy-mode cluster \
--executor-memory 4g --driver-memory 4g --executor-cores 4 --num-executors 6 \ --executor-memory 8g --driver-memory 4g --executor-cores 4 --num-executors 8 \
../.././DMP.jar \ ../.././DMP.jar \
-keyspace ${keyspace} -table ${table} -region ${region} -output ${output} -system ${system} \ -keyspace ${keyspace} -table ${table} -region ${region} -output ${output} -system ${system} \
-writetime_start ${writetime_start} -writetime_end ${writetime_end} -value_column ${value_column} -writetime_start ${writetime_start} -writetime_end ${writetime_end} -value_column ${value_column}
......
...@@ -12,14 +12,14 @@ hadoop fs -rm -r "${OUTPUT_PATH1}" ...@@ -12,14 +12,14 @@ hadoop fs -rm -r "${OUTPUT_PATH1}"
spark-submit --class mobvista.dmp.datasource.dm.RtdmpTmpId1142110895 \ spark-submit --class mobvista.dmp.datasource.dm.RtdmpTmpId1142110895 \
--conf spark.yarn.executor.memoryOverhead=2048 \ --conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=3000 \ --conf spark.default.parallelism=10000 \
--conf spark.sql.shuffle.partitions=3000 \ --conf spark.sql.shuffle.partitions=10000 \
--conf spark.driver.maxResultSize=4g \ --conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \ --conf spark.network.timeout=720s \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 70 \ --master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 5 --num-executors 80 \
../../${JAR} -dt_today ${dt_today} -output1 ${OUTPUT_PATH1} \ ../../${JAR} -dt_today ${dt_today} -output1 ${OUTPUT_PATH1} \
-coalesce 420 -coalesce 400
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
......
...@@ -11,16 +11,12 @@ import org.apache.http.HttpResponse; ...@@ -11,16 +11,12 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import sun.misc.BASE64Decoder;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/** /**
* 刘凯 2018-01-18 16:20 adn_sdk 工具类 * 刘凯 2018-01-18 16:20 adn_sdk 工具类
...@@ -52,9 +48,8 @@ public class AdnSdkTools extends UDF implements Serializable { ...@@ -52,9 +48,8 @@ public class AdnSdkTools extends UDF implements Serializable {
sb.append(n); sb.append(n);
} }
cal = sb.toString(); cal = sb.toString();
BASE64Decoder decoder = new BASE64Decoder();
byte[] b; byte[] b;
b = decoder.decodeBuffer(cal); b = Base64.getDecoder().decode(cal);
ret = new String(b, "UTF-8"); ret = new String(b, "UTF-8");
// System.out.println("AdnSdkTools.evaluate.ret:" + ret); // System.out.println("AdnSdkTools.evaluate.ret:" + ret);
} catch (Exception e) { } catch (Exception e) {
......
...@@ -101,7 +101,7 @@ class BackFlow extends CommonSparkJob with Serializable { ...@@ -101,7 +101,7 @@ class BackFlow extends CommonSparkJob with Serializable {
// 备份 S3 // 备份 S3
filterDF.drop("write_time") filterDF.drop("write_time")
.repartition(50) .repartition(200)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
......
package mobvista.dmp.datasource.dm package mobvista.dmp.datasource.dm
import java.text.SimpleDateFormat
import java.util
import java.util.regex.Pattern
import com.alibaba.fastjson.JSONObject import com.alibaba.fastjson.JSONObject
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import com.google.gson.{JsonArray, JsonObject} import com.google.gson.{JsonArray, JsonObject}
...@@ -15,6 +11,9 @@ import org.apache.spark.broadcast.Broadcast ...@@ -15,6 +11,9 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.Row import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.types.{StringType, StructField, StructType}
import java.text.SimpleDateFormat
import java.util
import java.util.regex.Pattern
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
...@@ -710,7 +709,7 @@ object Constant { ...@@ -710,7 +709,7 @@ object Constant {
| WHERE dt = '@dt' AND platform = '@platform' AND cnt >= @cnt | WHERE dt = '@dt' AND platform = '@platform' AND cnt >= @cnt
""".stripMargin """.stripMargin
case class DmInterestTagV2(device_id: String, device_type: String, platform: String, tags: String, ext_data: String, update_date: String) extends java.io.Serializable case class DmInterestTagV2(device_id: String, device_type: String, platform: String, install: String, tags: String, ext_data: String, update_date: String) extends java.io.Serializable
def mergeExtData(ext_datas: mutable.WrappedArray[String]) = { def mergeExtData(ext_datas: mutable.WrappedArray[String]) = {
val businessSet = Set("adn_request_sdk", "adn_request_unmatch", "dsp_req", "dsp_req_unmatch") val businessSet = Set("adn_request_sdk", "adn_request_unmatch", "dsp_req", "dsp_req_unmatch")
......
package mobvista.dmp.datasource.dm package mobvista.dmp.datasource.dm
import java.net.URI
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.dm.Constant.DmInterestTagV2 import mobvista.dmp.datasource.dm.Constant.DmInterestTagV2
...@@ -10,6 +8,7 @@ import org.apache.commons.lang3.StringUtils ...@@ -10,6 +8,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
...@@ -133,12 +132,17 @@ class DmpDeviceInterest extends CommonSparkJob with Serializable { ...@@ -133,12 +132,17 @@ class DmpDeviceInterest extends CommonSparkJob with Serializable {
jsonArray.add(json) jsonArray.add(json)
} }
}) })
/*
if (jsonArray.size() > 0) { if (jsonArray.size() > 0) {
DmInterestTagV2(device_id, device_type, platform, jsonArray.toString, ext_data, update_date) DmInterestTagV2(device_id, device_type, platform, jsonArray.toString, ext_data, update_date)
} else { } else {
null null
} }
}).filter(_ != null) */
DmInterestTagV2(device_id = device_id, device_type = device_type, platform = platform, install = install_list.asJava.toString,
tags = jsonArray.toString, ext_data = ext_data, update_date = update_date)
})
rdd.repartition(coalesce).toDF() rdd.repartition(coalesce).toDF()
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
......
...@@ -47,9 +47,8 @@ class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable { ...@@ -47,9 +47,8 @@ class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable {
.getOrCreate() .getOrCreate()
val sc = spark.sparkContext val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output1), true)
try { try {
val sql1= val sql1=
...@@ -57,7 +56,7 @@ class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable { ...@@ -57,7 +56,7 @@ class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable {
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('1142110895','id1142110895') and device_type not in ('androidid','android_id','ruid') |select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('1142110895','id1142110895') and device_type not in ('androidid','android_id','ruid')
""".stripMargin """.stripMargin
spark.sql(sql1).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt) spark.sql(sql1).rdd.flatMap(buildRes).repartition(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output1}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration) .saveAsNewAPIHadoopFile(s"${output1}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
} finally { } finally {
...@@ -66,8 +65,8 @@ class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable { ...@@ -66,8 +65,8 @@ class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable {
0 0
} }
def buildRes( row: Row): Array[Tuple2[Text, Text]] = { def buildRes(row: Row): Array[(Text, Text)] = {
val buffer = new ArrayBuffer[Tuple2[Text, Text]]() val buffer = new ArrayBuffer[(Text, Text)]()
val device_id = row.getAs[String]("device_id") val device_id = row.getAs[String]("device_id")
val device_type = row.getAs[String]("device_type") val device_type = row.getAs[String]("device_type")
if (StringUtils.isNotBlank(device_type)) { if (StringUtils.isNotBlank(device_type)) {
......
...@@ -3,22 +3,24 @@ package mobvista.dmp.datasource.taobao ...@@ -3,22 +3,24 @@ package mobvista.dmp.datasource.taobao
import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import java.net.URI import java.net.URI
import scala.collection.mutable.ArrayBuffer
/** /**
* @author jiangfan * @author jiangfan
* @date 2021/4/23 10:42 * @date 2021/4/23 10:42
*/ */
class EtlComTencentNewsDaily extends CommonSparkJob with Serializable { class EtlComTencentNewsDaily extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = { override protected def buildOptions(): Options = {
val options = new Options val options = new Options
options.addOption("dt_today", true, "[must] dt_today") options.addOption("dt_today", true, "[must] dt_today")
options.addOption("dt_dash_rec7day", true, "[must] dt_dash_rec7day") options.addOption("dt_dash_rec7day", true, "[must] dt_dash_rec7day")
options.addOption("dt_dash_rec15day", true, "[must] dt_dash_rec15day") options.addOption("dt_dash_rec15day", true, "[must] dt_dash_rec15day")
options.addOption("output",true, "[must] output") options.addOption("output", true, "[must] output")
options.addOption("coalesce",true, "[must] coalesce") options.addOption("coalesce", true, "[must] coalesce")
options options
} }
...@@ -50,6 +52,7 @@ class EtlComTencentNewsDaily extends CommonSparkJob with Serializable { ...@@ -50,6 +52,7 @@ class EtlComTencentNewsDaily extends CommonSparkJob with Serializable {
val sc = spark.sparkContext val sc = spark.sparkContext
try { try {
val sql1 = val sql1 =
s""" s"""
|select device_id,device_type,platform,'com.tencent.news_bes_7' package_name from dwh.dm_install_list_v2 |select device_id,device_type,platform,'com.tencent.news_bes_7' package_name from dwh.dm_install_list_v2
...@@ -69,11 +72,44 @@ class EtlComTencentNewsDaily extends CommonSparkJob with Serializable { ...@@ -69,11 +72,44 @@ class EtlComTencentNewsDaily extends CommonSparkJob with Serializable {
|and package_name in ('com.tencent.news_oppo') |and package_name in ('com.tencent.news_oppo')
""".stripMargin """.stripMargin
spark.sql(sql1).coalesce(coalesce.toInt) val sql =
s"""
|SELECT device_id, device_type, platform, package_name, update_date
| FROM dwh.dm_install_list_v2
| WHERE dt = '${dt_today}' AND business = 'dsp_req' AND update_date >= '${dt_dash_rec15day}'
| AND package_name IN ('com.tencent.news_bes','com.tencent.news_oppo')
|""".stripMargin
def schema: StructType = {
StructType(StructField("device_id", StringType) ::
StructField("device_type", StringType) ::
StructField("platform", StringType) ::
StructField("package_name", StringType) ::
Nil)
}
val rdd = spark.sql(sql).rdd.map(row => {
val array = new ArrayBuffer[Row]()
val device_id = row.getAs[String]("device_id")
val device_type = row.getAs[String]("device_type")
val platform = row.getAs[String]("platform")
val package_name = row.getAs[String]("package_name")
val update_date = row.getAs[String]("update_date")
array += Row(device_id, device_type, platform, package_name + "_15")
if (update_date.compareTo(dt_dash_rec7day) >= 0) {
array += Row(device_id, device_type, platform, package_name + "_7")
}
array
}).flatMap(l => l)
spark.createDataFrame(rdd, schema)
.repartition(coalesce.toInt)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("orc.compress", "zlib") .option("orc.compress", "zlib")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", value = false)
.orc(output) .orc(output)
} finally { } finally {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment