Commit 4605b740 by WangJinfeng

support reyun

parent 5d5e95e5
...@@ -70,6 +70,16 @@ OUTPUT_REYUN_INSTALL_PATH="s3://mob-emr-test/mv2reyun/tracking_install" ...@@ -70,6 +70,16 @@ OUTPUT_REYUN_INSTALL_PATH="s3://mob-emr-test/mv2reyun/tracking_install"
OUTPUT_REYUN_EVENT_PATH="s3://mob-emr-test/mv2reyun/dmp_event_daily" OUTPUT_REYUN_EVENT_PATH="s3://mob-emr-test/mv2reyun/dmp_event_daily"
OUTPUT_REYUN_APP_TAG_PATH="s3://mob-emr-test/mv2reyun/dim_app_tag_daily"
OUTPUT_REYUN_PACKAGE_TAG_PATH="s3://mob-emr-test/mv2reyun/dim_package_tag_daily"
OUTPUT_REYUN_TAG_INFO_PATH="s3://mob-emr-test/mv2reyun/dim_tag_info_daily"
OUTPUT_REYUN_NEW_TAG_INFO_PATH="s3://mob-emr-test/mv2reyun/dim_new_tag_info_daily"
OUTPUT_INSTALL_LIST_INFO_PATH="s3://mob-emr-test/mv2reyun/dmp_install_list_daily"
OUTPUT_REYUN_USER_INFO_PATH="s3://mob-emr-test/mv2reyun/dmp_user_info" OUTPUT_REYUN_USER_INFO_PATH="s3://mob-emr-test/mv2reyun/dmp_user_info"
ETL_DSP_REQ_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_dsp_request_daily" ETL_DSP_REQ_DAILY="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_dsp_request_daily"
......
type=command
command=sh -x dim_app_tag_daily.sh
\ No newline at end of file
#! /bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# @author : wangjf
# # # # # # # # # # # # # # # # # # # # # #
source ../../dmp_env.sh
dt=$(date +"%Y%m%d" -d "-1 day $ScheduleTime")
date_path=$(date +"%Y/%m/%d" -d "-1 day $ScheduleTime")
check_await ${APP_TAG_PATH}/${date_path}/_SUCCESS
OUTPUT_PATH="${OUTPUT_REYUN_APP_TAG_PATH}/${date_path}"
spark-submit --class mobvista.dmp.output.reyun.AppTagDaily \
--master yarn --deploy-mode cluster --executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 2 \
../../${JAR} -date ${dt} -output ${OUTPUT_PATH} -coalesce 4
if [ $? -ne 0 ]; then
exit 255
fi
type=command
command=sh -x dim_new_tag_info_daily.sh
\ No newline at end of file
#! /bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# @author : wangjf
# # # # # # # # # # # # # # # # # # # # # #
source ../../dmp_env.sh
dt=$(date +"%Y%m%d" -d "-1 day $ScheduleTime")
date_path=$(date +"%Y/%m/%d" -d "-1 day $ScheduleTime")
OUTPUT_PATH="${OUTPUT_REYUN_NEW_TAG_INFO_PATH}/${date_path}"
spark-submit --class mobvista.dmp.output.reyun.NewTagInfoDaily \
--master yarn --deploy-mode cluster --executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 2 \
../../${JAR} -date ${dt} -output ${OUTPUT_PATH} -coalesce 4
if [ $? -ne 0 ]; then
exit 255
fi
type=command
command=sh -x dim_package_tag_daily.sh
\ No newline at end of file
#! /bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# @author : wangjf
# # # # # # # # # # # # # # # # # # # # # #
source ../../dmp_env.sh
dt=$(date +"%Y%m%d" -d "-1 day $ScheduleTime")
date_path=$(date +"%Y/%m/%d" -d "-1 day $ScheduleTime")
OUTPUT_PATH="${OUTPUT_REYUN_PACKAGE_TAG_PATH}/${date_path}"
spark-submit --class mobvista.dmp.output.reyun.PackageTagDaily \
--master yarn --deploy-mode cluster --executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 2 \
../../${JAR} -date ${dt} -output ${OUTPUT_PATH} -coalesce 4
if [ $? -ne 0 ]; then
exit 255
fi
type=command
command=sh -x dim_tag_info_daily.sh
\ No newline at end of file
#! /bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# @author : wangjf
# # # # # # # # # # # # # # # # # # # # # #
source ../../dmp_env.sh
dt=$(date +"%Y%m%d" -d "-1 day $ScheduleTime")
date_path=$(date +"%Y/%m/%d" -d "-1 day $ScheduleTime")
OUTPUT_PATH="${OUTPUT_REYUN_TAG_INFO_PATH}/${date_path}"
spark-submit --class mobvista.dmp.output.reyun.TagInfoDaily \
--master yarn --deploy-mode cluster --executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 2 \
../../${JAR} -date ${dt} -output ${OUTPUT_PATH} -coalesce 4
if [ $? -ne 0 ]; then
exit 255
fi
package mobvista.dmp.output.reyun
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.output.reyun
* @author: wangjf
* @date: 2021/9/14
* @time: 2:06 下午
* @email: jinfeng.wang@mobvista.com
*/
class NewTagInfoDaily extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("date", true, "[must] date")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else {
printOptions(commandLine)
}
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"NewTagInfoDaily.${date}")
try {
val sql =
s"""
|SELECT * FROM dwh.dm_old2new_tag
|""".stripMargin
val df = spark.sql(sql)
df.repartition(coalesce)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
spark.stop()
}
0
}
}
object NewTagInfoDaily {
def main(args: Array[String]): Unit = {
new NewTagInfoDaily().run(args)
}
}
\ No newline at end of file
package mobvista.dmp.output.reyun
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.output.reyun
* @author: wangjf
* @date: 2021/9/14
* @time: 2:06 下午
* @email: jinfeng.wang@mobvista.com
*/
class PackageTagDaily extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("date", true, "[must] date")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else {
printOptions(commandLine)
}
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"PackageTagDaily.${date}")
try {
val sql =
s"""
|SELECT * FROM dwh.dim_package_tags
|""".stripMargin
val df = spark.sql(sql)
df.take(20).foreach(println)
df.repartition(coalesce)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
spark.stop()
}
0
}
}
object PackageTagDaily {
def main(args: Array[String]): Unit = {
new PackageTagDaily().run(args)
}
}
\ No newline at end of file
package mobvista.dmp.output.reyun
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.output.reyun
* @author: wangjf
* @date: 2021/9/14
* @time: 2:06 下午
* @email: jinfeng.wang@mobvista.com
*/
class TagInfoDaily extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("date", true, "[must] date")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else {
printOptions(commandLine)
}
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"TagInfoDaily.${date}")
try {
val sql =
s"""
|SELECT * FROM dwh.dim_category_mv_new
|""".stripMargin
val df = spark.sql(sql)
df.repartition(coalesce)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
spark.stop()
}
0
}
}
object TagInfoDaily {
def main(args: Array[String]): Unit = {
new TagInfoDaily().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment