Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mobvista-dmp
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
王金锋
mobvista-dmp
Commits
2e4a00d3
Commit
2e4a00d3
authored
Jul 27, 2021
by
fan.jiang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
reyun baijiu zhuangku
parent
feec0592
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
176 additions
and
0 deletions
+176
-0
reyun_label_baijiu.job
azkaban/ali/reyun/reyun_label_baijiu.job
+3
-0
reyun_label_baijiu.sh
azkaban/ali/reyun/reyun_label_baijiu.sh
+36
-0
dmp_env.sh
azkaban/dmp_env.sh
+1
-0
ReyunLabelBaijiu.scala
...cala/mobvista/dmp/datasource/reyun/ReyunLabelBaijiu.scala
+136
-0
No files found.
azkaban/ali/reyun/reyun_label_baijiu.job
0 → 100644
View file @
2e4a00d3
type=command
command=sh -x reyun_label_baijiu.sh
\ No newline at end of file
azkaban/ali/reyun/reyun_label_baijiu.sh
0 → 100644
View file @
2e4a00d3
#!/bin/sh
# # # # # # # # # # # # # # # # # # # # # #
# @author : jiangfan
# @date : 2021-07-01 12:06:00
# # # # # # # # # # # # # # # # # # # # # #
#!/usr/bin/env bash
source
../../dmp_env.sh
dt_today
=
$(
date
-d
"
$ScheduleTime
1 days ago"
+
"%Y%m%d"
)
dt_slash_today
=
$(
date
-d
"
$ScheduleTime
1 days ago"
+
"%Y/%m/%d"
)
OUTPUT_PATH
=
"
${
REYUN_LABEL_BAIJIU_DAILY_PATH
}
/
${
dt_slash_today
}
"
hadoop fs
-rm
-r
"
${
OUTPUT_PATH
}
"
spark-submit
--class
mobvista.dmp.datasource.reyun.ReyunLabelBaijiu
\
--conf
spark.yarn.executor.memoryOverhead
=
2048
\
--conf
spark.network.timeout
=
720s
\
--conf
spark.driver.maxResultSize
=
4g
\
--conf
spark.default.parallelism
=
2000
\
--conf
spark.sql.shuffle.partitions
=
2000
\
--conf
spark.sql.broadcastTimeout
=
1200
\
--conf
spark.sql.autoBroadcastJoinThreshold
=
31457280
\
--files
${
HIVE_SITE_PATH
}
\
--jars
${
JARS
}
\
--master
yarn
--deploy-mode
cluster
--executor-memory
8g
--driver-memory
4g
--executor-cores
4
--num-executors
170
\
../../
${
JAR
}
\
-output
${
OUTPUT_PATH
}
-coalesce
680
-dt_today
${
dt_today
}
if
[[
$?
-ne
0
]]
;
then
exit
255
fi
azkaban/dmp_env.sh
View file @
2e4a00d3
...
@@ -358,6 +358,7 @@ REYUN_RAW_DATA="s3://mob-emr-test/reyun/pkginfo"
...
@@ -358,6 +358,7 @@ REYUN_RAW_DATA="s3://mob-emr-test/reyun/pkginfo"
# reyun business tmp data
# reyun business tmp data
REYUN_LABEL_TEST_DAILY_PATH
=
"s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/reyun_label_test_daily"
REYUN_LABEL_TEST_DAILY_PATH
=
"s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/reyun_label_test_daily"
REYUN_LABEL_BAIJIU_DAILY_PATH
=
"s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/reyun_label_baijiu_daily"
# alipay_activation business tmp data
# alipay_activation business tmp data
ALIPAY_ACTIVATION_DAILY_PATH
=
"s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_alipay_activation_daily"
ALIPAY_ACTIVATION_DAILY_PATH
=
"s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/etl_alipay_activation_daily"
...
...
src/main/scala/mobvista/dmp/datasource/reyun/ReyunLabelBaijiu.scala
0 → 100644
View file @
2e4a00d3
package
mobvista.dmp.datasource.reyun
import
mobvista.dmp.common.CommonSparkJob
import
mobvista.dmp.util.DateUtil
import
org.apache.commons.cli.Options
import
org.apache.hadoop.fs.
{
FileSystem
,
Path
}
import
org.apache.spark.sql.SparkSession
import
java.net.URI
/**
* @author jiangfan
* @date 2021/7/23 18:00
*/
class
ReyunLabelBaijiu
extends
CommonSparkJob
with
Serializable
{
override
protected
def
buildOptions
()
:
Options
=
{
val
options
=
new
Options
options
.
addOption
(
"coalesce"
,
true
,
"[must] coalesce"
)
options
.
addOption
(
"output"
,
true
,
"[must] output"
)
options
.
addOption
(
"dt_today"
,
true
,
"[must] dt_today"
)
options
}
override
protected
def
run
(
args
:
Array
[
String
])
:
Int
=
{
val
commandLine
=
commParser
.
parse
(
options
,
args
)
if
(!
checkMustOption
(
commandLine
))
{
printUsage
(
options
)
return
-
1
}
else
printOptions
(
commandLine
)
val
coalesce
=
commandLine
.
getOptionValue
(
"coalesce"
)
val
output
=
commandLine
.
getOptionValue
(
"output"
)
val
dt_today
=
commandLine
.
getOptionValue
(
"dt_today"
)
val
spark
=
SparkSession
.
builder
()
.
appName
(
"ReyunLabelBaijiu"
)
.
config
(
"spark.rdd.compress"
,
"true"
)
.
config
(
"spark.io.compression.codec"
,
"snappy"
)
.
config
(
"spark.sql.orc.filterPushdown"
,
"true"
)
.
config
(
"spark.sql.warehouse.dir"
,
"s3://mob-emr-test/spark-warehouse"
)
.
config
(
"spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)
.
enableHiveSupport
()
.
getOrCreate
()
val
sc
=
spark
.
sparkContext
import
spark.implicits._
FileSystem
.
get
(
new
URI
(
s
"s3://mob-emr-test"
),
spark
.
sparkContext
.
hadoopConfiguration
).
delete
(
new
Path
(
output
),
true
)
val
one_day_ago
=
DateUtil
.
getDayByString
(
dt_today
,
"yyyyMMdd"
,
-
1
)
val
two_days_ago
=
DateUtil
.
getDayByString
(
dt_today
,
"yyyyMMdd"
,
-
2
)
try
{
val
sql1
=
s
"""
|select distinct device_id
|from
|(select md5(imei) as device_id
|from
|(select app_id,
|(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then imei else null end) as imei,
|(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid
|from
|dwh.ods_adn_trackingnew_request
|where concat(yyyy,mm,dd) in ('${dt_today}')
|and country_code = 'CN'
|and platform = 'android'
|union all
|select app_id,(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then imei else null end) as imei,
|(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid
|from dwh.ods_adn_trackingnew_hb_request
|where concat(yyyy,mm,dd) in ('${dt_today}')
|and country_code = 'CN'
|and platform = 'android'
|) as tmp
|where tmp.imei is not null and tmp.app_id in (select appid from dwh.reyun_label_baijiu_filter_appid)
|union all
|select md5(oaid) as device_id
|from
|(select app_id,
|(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then imei else null end) as imei,
|(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid
|from
|dwh.ods_adn_trackingnew_request
|where concat(yyyy,mm,dd) in ('${dt_today}')
|and country_code = 'CN'
|and platform = 'android'
|union all
|select app_id,(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then imei else null end) as imei,
|(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid
|from dwh.ods_adn_trackingnew_hb_request
|where concat(yyyy,mm,dd) in ('${dt_today}')
|and country_code = 'CN'
|and platform = 'android'
|) as tmp
|where tmp.oaid is not null and tmp.app_id in (select appid from dwh.reyun_label_baijiu_filter_appid)
|union all
|select if(imeimd5 is null ,md5(imei),imeimd5) as device_id
|from
|(select split(ext5,',')[4] as imei,split(ext5,',')[5] as imeimd5
|from adn_dsp.log_adn_dsp_request_orc_hour
|where concat(yr,mt,dt) in ('${dt_today}') and hh >='00' and hh<'06'
|and countrycode = 'CN'
|and os = 'android'
|and exchanges='xunfei'
|) adx
|union all
|select if(oaidmd5 is null ,md5(oaid),oaidmd5) as device_id
|from
|(select split(ext5,',')[12] as oaid ,split(ext5,',')[13] as oaidmd5
|from adn_dsp.log_adn_dsp_request_orc_hour
|where concat(yr,mt,dt) in ('${dt_today}') and hh >='00' and hh<'06'
|and countrycode = 'CN'
|and os = 'android'
|and exchanges='xunfei'
|) adx
|) tmp
|"""
.
stripMargin
println
(
"sql=============="
+
sql1
)
spark
.
sql
(
sql1
).
rdd
.
map
(
_
.
mkString
).
coalesce
(
coalesce
.
toInt
).
saveAsTextFile
(
output
)
}
finally
{
spark
.
stop
()
}
0
}
}
object
ReyunLabelBaijiu
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
new
ReyunLabelBaijiu
().
run
(
args
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment