Commit b850ef59 by fan.jiang

lazada add ios data of id th ph my sg countrys

parent 26bfd9e2
...@@ -10,6 +10,11 @@ dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d") ...@@ -10,6 +10,11 @@ dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
dt_slash_one_day=$(date -d "$ScheduleTime 2 days ago" +"%Y/%m/%d") dt_slash_one_day=$(date -d "$ScheduleTime 2 days ago" +"%Y/%m/%d")
VN_IDFA_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/idfa/VN" VN_IDFA_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/idfa/VN"
ID_IDFA_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/idfa/ID"
TH_IDFA_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/idfa/TH"
PH_IDFA_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/idfa/PH"
MY_IDFA_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/idfa/MY"
SG_IDFA_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/idfa/SG"
...@@ -30,7 +35,8 @@ spark-submit --class mobvista.dmp.datasource.taobao.EtlLazadaIosActivitionDaily ...@@ -30,7 +35,8 @@ spark-submit --class mobvista.dmp.datasource.taobao.EtlLazadaIosActivitionDaily
--files ${HIVE_SITE_PATH} \ --files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 4 --num-executors 90 ../${JAR} \ --master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 4 --num-executors 90 ../${JAR} \
-today ${dt_today} -last_req_day ${last_req_day} \ -today ${dt_today} -last_req_day ${last_req_day} \
-vn_idfaoutput "${VN_IDFA_OUTPUT_PATH}" -vn_idfaoutput "${VN_IDFA_OUTPUT_PATH}" -id_idfaoutput "${ID_IDFA_OUTPUT_PATH}" -th_idfaoutput "${TH_IDFA_OUTPUT_PATH}" \
-ph_idfaoutput "${PH_IDFA_OUTPUT_PATH}" -my_idfaoutput "${MY_IDFA_OUTPUT_PATH}" -sg_idfaoutput "${SG_IDFA_OUTPUT_PATH}"
if [ $? -ne 0 ];then if [ $? -ne 0 ];then
exit 255 exit 255
......
...@@ -19,6 +19,11 @@ class EtlLazadaIosActivitionDaily extends CommonSparkJob { ...@@ -19,6 +19,11 @@ class EtlLazadaIosActivitionDaily extends CommonSparkJob {
options.addOption("vn_idfaoutput", true, "[must] vn_idfaoutput") options.addOption("vn_idfaoutput", true, "[must] vn_idfaoutput")
options.addOption("today", true, "[must] today") options.addOption("today", true, "[must] today")
options.addOption("last_req_day", true, "[must] last_req_day") options.addOption("last_req_day", true, "[must] last_req_day")
options.addOption("id_idfaoutput", true, "[must] id_idfaoutput")
options.addOption("th_idfaoutput", true, "[must] th_idfaoutput")
options.addOption("ph_idfaoutput", true, "[must] ph_idfaoutput")
options.addOption("my_idfaoutput", true, "[must] my_idfaoutput")
options.addOption("sg_idfaoutput", true, "[must] sg_idfaoutput")
options options
} }
...@@ -34,6 +39,11 @@ class EtlLazadaIosActivitionDaily extends CommonSparkJob { ...@@ -34,6 +39,11 @@ class EtlLazadaIosActivitionDaily extends CommonSparkJob {
val today = commandLine.getOptionValue("today") val today = commandLine.getOptionValue("today")
val vn_idfaoutput = commandLine.getOptionValue("vn_idfaoutput") val vn_idfaoutput = commandLine.getOptionValue("vn_idfaoutput")
val last_req_day = commandLine.getOptionValue("last_req_day") val last_req_day = commandLine.getOptionValue("last_req_day")
val id_idfaoutput = commandLine.getOptionValue("id_idfaoutput")
val th_idfaoutput = commandLine.getOptionValue("th_idfaoutput")
val ph_idfaoutput = commandLine.getOptionValue("ph_idfaoutput")
val my_idfaoutput = commandLine.getOptionValue("my_idfaoutput")
val sg_idfaoutput = commandLine.getOptionValue("sg_idfaoutput")
val spark = SparkSession.builder() val spark = SparkSession.builder()
...@@ -49,6 +59,11 @@ class EtlLazadaIosActivitionDaily extends CommonSparkJob { ...@@ -49,6 +59,11 @@ class EtlLazadaIosActivitionDaily extends CommonSparkJob {
import spark.implicits._ import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(vn_idfaoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(vn_idfaoutput), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(id_idfaoutput), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(th_idfaoutput), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(ph_idfaoutput), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(my_idfaoutput), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(sg_idfaoutput), true)
try { try {
...@@ -64,6 +79,11 @@ class EtlLazadaIosActivitionDaily extends CommonSparkJob { ...@@ -64,6 +79,11 @@ class EtlLazadaIosActivitionDaily extends CommonSparkJob {
val dfCache: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER) val dfCache: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "VN").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(vn_idfaoutput) dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "VN").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(vn_idfaoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "ID").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(id_idfaoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "TH").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(th_idfaoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "PH").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(ph_idfaoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "MY").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(my_idfaoutput)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "SG").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(sg_idfaoutput)
} finally { } finally {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment