Commit 0da5688b by fan.jiang

reyun data write to dmp

parent 22fa928a
type=command
dependencies=reyun_install_list_v2,reyun_user_info
command=echo "reyun job end!"
type=command
command=sh -x reyun_daily.sh
\ No newline at end of file
#!/bin/sh
# # # # # # # # # # # # # # # # # # # # # #
# @author : jiangfan
# @date : 2021-05-25 12:06:00
# # # # # # # # # # # # # # # # # # # # # #
#!/usr/bin/env bash
source ../../dmp_env.sh
source ././../../ga_rawdata_analysis/common/tools.sh
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_dash_one_days=$(date -d "$ScheduleTime 1 days ago" +"%Y-%m-%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
INPUT_PATH="${REYUN_RAW_DATA}/${dt_slash_today}"
check_await "${INPUT_PATH}"
common_mount_partition "reyun" "pkginfo" "ds='${dt_today}'" "${INPUT_PATH}"
OUTPUT_PATH="${REYUN_DAILY_PATH}/${dt_slash_today}"
hadoop fs -rm -r "${OUTPUT_PATH}"
spark-submit --class mobvista.dmp.datasource.reyun.ReyunDaily \
--conf spark.network.timeout=720s \
--conf spark.driver.maxResultSize=4g \
--conf spark.default.parallelism=2000 \
--conf spark.sql.shuffle.partitions=2000 \
--conf spark.sql.broadcastTimeout=1200 \
--conf spark.sql.autoBroadcastJoinThreshold=31457280 \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 6g --driver-memory 4g --executor-cores 3 --num-executors 60 \
../../${JAR} \
-output ${OUTPUT_PATH} -coalesce 400 -dt_today ${dt_today}
if [[ $? -ne 0 ]];then
exit 255
fi
mount_partition "etl_reyun_daily" "dt='${dt_today}'" "$OUTPUT_PATH"
hadoop fs -touchz ${OUTPUT_PATH}/_SUCCESS
\ No newline at end of file
type=command
dependencies=reyun_daily
command=sh -x reyun_install_list.sh
\ No newline at end of file
#!/bin/sh
source ../../dmp_env.sh
today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
yesterday=$(date -d "$ScheduleTime 2 days ago" +"%Y/%m/%d")
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt=$(date -d "$ScheduleTime 1 days ago" +"%Y-%m-%d")
echo ${today}
echo ${yesterday}
INPUT_PATH="${REYUN_DAILY_PATH}/${today}"
OLD_INPUT_PATH="${DM_INSTALL_LIST}/${yesterday}/reyun"
OUTPUT_PATH="${DM_INSTALL_LIST}/${today}/reyun"
check_await "$INPUT_PATH/_SUCCESS"
check_await "$OLD_INPUT_PATH/_SUCCESS"
hadoop fs -rm -r "$OUTPUT_PATH/"
spark-submit --class mobvista.dmp.datasource.reyun.ReyunInstallList \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.default.parallelism=2000 \
--conf spark.sql.shuffle.partitions=2000 \
--master yarn --deploy-mode cluster --name ReyunInstallList --executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 30 \
../../${JAR} -input ${INPUT_PATH} -oldInput ${OLD_INPUT_PATH} -output ${OUTPUT_PATH} -date ${dt} -parallelism 500 -coalesce 500
if [ $? -ne 0 ];then
exit 255
fi
mount_partition "dm_install_list" "year='${dt_today:0:4}', month='${dt_today:4:2}', day='${dt_today:6:2}', business='reyun'" "$OUTPUT_PATH"
\ No newline at end of file
type=command
dependencies=reyun_install_list
command=sh -x reyun_install_list_v2.sh
\ No newline at end of file
#!/bin/bash
source ../../dmp_env.sh
LOG_TIME=$(date +%Y%m%d -d "-1 day $ScheduleTime")
date_path=$(date +%Y/%m/%d -d "-1 day $ScheduleTime")
business="reyun"
INPUT_PATH="${DM_INSTALL_LIST}/$date_path/${business}"
OUTPUT_PATH="${DM_INSTALL_LIST}_v2/$date_path/${business}"
check_await ${INPUT_PATH}/_SUCCESS
hadoop fs -rm -r ${OUTPUT_PATH}
REDUCE_NUM=$(calculate_reduce_num ${INPUT_PATH})
hadoop jar ../../${JAR} mobvista.dmp.main.ParseInstallRCFile ${INPUT_PATH} ${OUTPUT_PATH} 100
if [ $? -ne 0 ];then
exit 255
fi
mount_partition "dm_install_list_v2" "dt='$LOG_TIME', business='${business}'" "$OUTPUT_PATH"
if [ $? -ne 0 ];then
exit 255
fi
type=command
dependencies=reyun_daily
command=sh -x reyun_user_info.sh
\ No newline at end of file
#!/bin/sh
source ../../dmp_env.sh
today=${ScheduleTime:-$1}
dt=$(date +"%Y%m%d" -d "-1 day $today")
date=$(date +"%Y-%m-%d" -d "-1 day $today")
date_path=$(date +%Y/%m/%d -d "-1 day $today")
old_path=$(date +%Y/%m/%d -d "-2 day $today")
expire_date=$(date +%Y%m%d -d "-6 day $today")
expire_path=$(date +%Y/%m/%d -d "-6 day $today")
business_name="reyun"
daily_path="${REYUN_DAILY_PATH}/$date_path"
age_path="${AGE_CALC_DEVICE}/$date_path"
gender_path="${GENDER_CALC_DEVICE}/$date_path"
old_total_path="${ODS_DMP_USER_INFO}/$old_path/${business_name}"
OUTPUT_PATH="${ODS_DMP_USER_INFO}/$date_path/${business_name}"
unmount_path="${ODS_DMP_USER_INFO}/$expire_path/${business_name}"
coalesce=`calculate_reduce_num "${old_total_path};${daily_path}"`
coalesce=$(( $coalesce*5 ))
check_await "${old_total_path}/_SUCCESS"
userInfoJob "$date" "$daily_path" "orc" "0" "1" "2" "4" "$age_path" "$gender_path" "$old_total_path" "$OUTPUT_PATH" ${coalesce} "../../${JAR}" 100 200
mount_partition "ods_dmp_user_info" "dt='${dt}', business='${business_name}'" "$OUTPUT_PATH"
unmount_partition "ods_dmp_user_info" "dt='${expire_date}', business='${business_name}'" "$unmount_path"
\ No newline at end of file
...@@ -351,6 +351,10 @@ TO_DAILY_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_to_dail ...@@ -351,6 +351,10 @@ TO_DAILY_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_to_dail
# btop business tmp data # btop business tmp data
BTOP_DAILY_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_btop_daily" BTOP_DAILY_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_btop_daily"
# reyun business tmp data
REYUN_DAILY_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_reyun_daily"
REYUN_RAW_DATA="s3://mob-emr-test/reyun/pkginfo"
# alipay_activation business tmp data # alipay_activation business tmp data
ALIPAY_ACTIVATION_DAILY_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_alipay_activation_daily" ALIPAY_ACTIVATION_DAILY_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_alipay_activation_daily"
......
...@@ -9,7 +9,7 @@ import scala.collection.JavaConversions._ ...@@ -9,7 +9,7 @@ import scala.collection.JavaConversions._
* spark应用程序模板类 * spark应用程序模板类
*/ */
abstract class CommonSparkJob { abstract class CommonSparkJob {
// oaid的正则有以下两种形式didPtn和oaidAnotherPtn,参考 https://www.h5w3.com/33786.html
val QUERY = "QUERY" val QUERY = "QUERY"
val ENCODING = "UTF-8" val ENCODING = "UTF-8"
val HTTPPREFIX = "http://test.com" val HTTPPREFIX = "http://test.com"
......
package mobvista.dmp.datasource.reyun
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.device.Constant.imeiPtn
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI
/**
* @author jiangfan
* @date 2021/5/25 11:50
*/
class ReyunDaily extends CommonSparkJob with Serializable{
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output", true, "[must] output")
options.addOption("dt_today", true, "[must] dt_today")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output = commandLine.getOptionValue("output")
val dt_today = commandLine.getOptionValue("dt_today")
val spark = SparkSession.builder()
.appName("ReyunDaily")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
val Delimiter01="\u0006"
val Delimiter02="\u0005"
try {
val sql1=
s"""
|select t2.device_id,t2.device_type,case when t2.platform is not null then t2.platform else 'android' end as platform, t2.package_name,'cn' country from
|(select t1.device_id,
|case when t1.tmp_device_type='imei2' and t1.device_id rlike '${imeiPtn}' then 'imei'
|when t1.tmp_device_type='_imei' and t1.device_id rlike '${imeiPtn}' then 'imei'
|when t1.tmp_device_type='oaid' and (t1.device_id rlike '${didPtn}' or t1.device_id rlike '${oaidAnotherPtn}' ) then 'oaid'
|when t1.tmp_device_type='androidid' and t1.device_id rlike '${andriodIdPtn}' then 'androidid'
|end as device_type,
|t1.platform,
|t1.package_name
| from
|(select nvl(xcontext['_deviceid'],xcontext['deviceid']) as device_id,
| case when nvl(xcontext['_deviceid'],xcontext['deviceid']) = xcontext['_imei2'] then 'imei2'
| when nvl(xcontext['_deviceid'],xcontext['deviceid']) = xcontext['_meid'] then 'meid'
| when nvl(xcontext['_deviceid'],xcontext['deviceid']) = xcontext['_oaid'] then 'oaid'
| when nvl(xcontext['_deviceid'],xcontext['deviceid']) = xcontext['_androidid'] then 'androidid'
| else '_imei' end as tmp_device_type,
| xcontext['_ryos'] platform,
| package_name
| from reyun.pkginfo
| LATERAL VIEW explode(str_to_map(xcontext['pkglist'],'\u0006','\u0005')) tmp01 AS package_name,value where ds='${dt_today}' ) t1) t2
| where t2.device_type is not null
""".stripMargin
spark.sql(sql1).coalesce(coalesce.toInt)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
.orc(output)
} finally {
spark.stop()
}
0
}
}
object ReyunDaily {
def main(args: Array[String]): Unit = {
new ReyunDaily().run(args)
}
}
package mobvista.dmp.datasource.reyun
import mobvista.dmp.common.CommonInstallList
import mobvista.dmp.datasource.mpsdk.InstallInfo
import scala.collection.mutable.ArrayBuffer
/**
* @author jiangfan
* @date 2021/6/1 16:34
*/
class ReyunInstallList extends CommonInstallList {
override def processDailyData(array: Array[String], date: String): Array[(String, InstallInfo)] = {
val buffer = new ArrayBuffer[(String, InstallInfo)]()
val device_id = array(0)
val device_type = array(1)
val platform = array(2)
val package_name = array(3)
buffer += Tuple2(s"$device_id$FIELD_SPLIT$device_type$FIELD_SPLIT$platform", new InstallInfo(package_name, date))
buffer.toArray
}
}
object ReyunInstallList {
def main(args: Array[String]): Unit = {
new ReyunInstallList().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment