Commit 118555b8 by fan.jiang

lazada add ios vn

parent 0a98c7f2
type=command type=command
dependencies=etl_lazada_ios_data_daily
command=sh -x etl_lazada_data_daily.sh command=sh -x etl_lazada_data_daily.sh
\ No newline at end of file
type=command
command=sh -x etl_lazada_ios_data_daily.sh
\ No newline at end of file
#!/usr/bin/env bash
source ../dmp_env.sh
dt_today=$(date -d "$ScheduleTime 2 days ago" +"%Y%m%d")
last_req_day=$(date -d "$ScheduleTime 31 days ago" +"%Y-%m-%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
dt_slash_one_day=$(date -d "$ScheduleTime 2 days ago" +"%Y/%m/%d")
VN_IDFA_OUTPUT_PATH="${LAZADA_OUTPUT_PATH}/${dt_slash_today}/idfa/VN"
# check_await "${ODS_DMP_USER_INFO_DAILY}/${dt_today}/_SUCCESS"
check_await "${ODS_DMP_USER_INFO}/${dt_slash_one_day}/adn_request/_SUCCESS"
check_await "${ODS_DMP_USER_INFO}/${dt_slash_one_day}/dsp_req/_SUCCESS"
hadoop fs -rm -r "${VN_IDFA_OUTPUT_PATH}"
spark-submit --class mobvista.dmp.datasource.taobao.EtlLazadaIosActivitionDaily \
--conf spark.network.timeout=720s \
--conf spark.default.parallelism=2000 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.sql.broadcastTimeout=1200 \
--conf spark.yarn.executor.memoryOverhead=4096 \
--conf spark.sql.autoBroadcastJoinThreshold=31457280 \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 4 --num-executors 90 ../${JAR} \
-today ${dt_today} -last_req_day ${last_req_day} \
-vn_idfaoutput "${VN_IDFA_OUTPUT_PATH}"
if [ $? -ne 0 ];then
exit 255
fi
package mobvista.dmp.datasource.taobao
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
/**
* @author jiangfan
* @date 2021/8/5 14:38
*/
class EtlLazadaIosActivitionDaily extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("vn_idfaoutput", true, "[must] vn_idfaoutput")
options.addOption("today", true, "[must] today")
options.addOption("last_req_day", true, "[must] last_req_day")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val today = commandLine.getOptionValue("today")
val vn_idfaoutput = commandLine.getOptionValue("vn_idfaoutput")
val last_req_day = commandLine.getOptionValue("last_req_day")
val spark = SparkSession.builder()
.appName("EtlLazadaIosActivitionDaily")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(vn_idfaoutput), true)
try {
val sql2=
s"""
|select lower(device_id) device_id,lower(country) country
|from dwh.ods_dmp_user_info where dt = '${today}' and last_req_day >= '${last_req_day}' and business not in ('other', 'ali_acquisition', 'ali_activation', 'adn_install')
| and device_type='idfa'
| and platform='ios'
| group by lower(device_id),lower(country)
""".stripMargin
val dfCache: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)
dfCache.rdd.filter(_.getAs[String]("country").toUpperCase() == "VN").map(_.getAs[String]("device_id")).coalesce(60).saveAsTextFile(vn_idfaoutput)
} finally {
spark.stop()
}
0
}
}
object EtlLazadaIosActivitionDaily {
def main(args: Array[String]): Unit = {
new EtlLazadaIosActivitionDaily().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment