Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mobvista-dmp
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
王金锋
mobvista-dmp
Commits
edf2bf9d
Commit
edf2bf9d
authored
Oct 27, 2021
by
WangJinfeng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix bug
parent
bc7b36ac
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
111 additions
and
107 deletions
+111
-107
adn_request_bundle_match.sh
azkaban/adn/package/adn_request_bundle_match.sh
+2
-2
adn_request_daily_v2.sh
azkaban/adn/package/adn_request_daily_v2.sh
+2
-2
adn_tencent_adx_device_tag.sh
azkaban/adn_adx/adn_tencent_adx_device_tag.sh
+10
-9
adn_tencent_adx_package.sh
azkaban/adn_adx/adn_tencent_adx_package.sh
+1
-0
dmp_device_interest.sh
azkaban/dm/dmp_device_interest.sh
+19
-19
AdnRequestSdkEtlDaily.scala
...mp/datasource/adn_request_sdk/AdnRequestSdkEtlDaily.scala
+74
-70
BundleMatchMain.scala
src/main/scala/mobvista/dmp/main/BundleMatchMain.scala
+3
-5
No files found.
azkaban/adn/package/adn_request_bundle_match.sh
View file @
edf2bf9d
...
...
@@ -29,9 +29,9 @@ UNMATCH_DATA_PATH="${DEV_UNMATCH_DATA_PATH}/${date_path}/adn_request_sdk"
mount_partition
"etl_adn_request_sdk_unmatch"
"dt='
$LOG_TIME
'"
"
$UNMATCH_DATA_PATH
"
||
exit
1
expire_date
=
$(
date
-d
"
$ScheduleTime
4
days ago"
"+%Y%m%d"
)
expire_date
=
$(
date
-d
"
$ScheduleTime
10
days ago"
"+%Y%m%d"
)
expire_date_path
=
$(
date
-d
"
$ScheduleTime
4
days ago"
"+%Y/%m/%d"
)
expire_date_path
=
$(
date
-d
"
$ScheduleTime
10
days ago"
"+%Y/%m/%d"
)
EXPIRE_OUTPUT_PATH
=
"
${
DEV_UNMATCH_DATA_PATH
}
/
${
expire_date_path
}
/adn_request_sdk"
...
...
azkaban/adn/package/adn_request_daily_v2.sh
View file @
edf2bf9d
...
...
@@ -25,14 +25,14 @@ spark-submit --class mobvista.dmp.datasource.adn_request_sdk.AdnRequestSdkEtlDai
--conf
spark.network.timeout
=
720s
\
--conf
spark.yarn.executor.memoryOverhead
=
2048
\
--conf
spark.sql.shuffle.partitions
=
2000
\
--conf
spark.default.parallelism
=
4
00
\
--conf
spark.default.parallelism
=
20
00
\
--conf
spark.driver.maxResultSize
=
8g
\
--conf
spark.executor.extraJavaOptions
=
"-XX:+UseG1GC"
\
--conf
spark.shuffle.memoryFraction
=
0.4
\
--conf
spark.storage.memoryFraction
=
0.4
\
--conf
spark.sql.files.maxPartitionBytes
=
268435456
\
--conf
spark.serializer
=
org.apache.spark.serializer.KryoSerializer
\
--master
yarn
--deploy-mode
cluster
--name
"AdnRequestSdkEtlDaily.
${
LOG_TIME
}
"
--executor-memory
6g
--driver-memory
4g
--executor-cores
4
--num-executors
100
\
--master
yarn
--deploy-mode
cluster
--name
"AdnRequestSdkEtlDaily.
${
LOG_TIME
}
"
--executor-memory
10g
--driver-memory
6g
--executor-cores
5
--num-executors
100
\
../../
${
JAR
}
-appIdMapping
${
INPUT_MAPPING_PATH
}
-manualAppIdMapping
${
DIM_MANUAL_MAPPING
}
-output
${
TMP_OUTPUT_PATH
}
-date
${
LOG_TIME
}
-coalesce
2000
if
[[
$?
-ne
0
]]
;
then
...
...
azkaban/adn_adx/adn_tencent_adx_device_tag.sh
View file @
edf2bf9d
...
...
@@ -23,15 +23,16 @@ check_await "${INPUT_GENDER_MERGE_DEVICE_PATH}/_SUCCESS"
hadoop fs
-rm
-r
"
${
OUTPUT_ADN_ADX_DEVICE_TAG_PATH
}
/"
spark-submit
--class
mobvista.dmp.datasource.adn_adx.AdnAdxDeviceTag
\
--conf
spark.yarn.executor.memoryOverhead
=
3072
\
--conf
spark.network.timeout
=
720s
\
--conf
spark.default.parallelism
=
10
\
--conf
spark.sql.autoBroadcastJoinThreshold
=
31457280
\
--jars
s3://mob-emr-test/dataplatform/DataWareHouse/offline/myjar/hive-hcatalog-core-2.3.3.jar
\
--master
yarn
--deploy-mode
cluster
--name
AdnAdxDeviceTag
--executor-memory
4g
--driver-memory
4g
--executor-cores
2
--num-executors
32
\
../
${
JAR
}
-outputadxdevtag
${
OUTPUT_ADN_ADX_DEVICE_TAG_PATH
}
\
-coalesce
80
\
-today
${
dt_today
}
-yesterday
${
dt_yesterday
}
--conf
spark.yarn.executor.memoryOverhead
=
3072
\
--conf
spark.network.timeout
=
720s
\
--conf
spark.default.parallelism
=
10
\
--conf
spark.sql.autoBroadcastJoinThreshold
=
31457280
\
--conf
spark.sql.hive.verifyPartitionPath
=
true
\
--jars
s3://mob-emr-test/dataplatform/DataWareHouse/offline/myjar/hive-hcatalog-core-2.3.3.jar
\
--master
yarn
--deploy-mode
cluster
--name
AdnAdxDeviceTag
--executor-memory
4g
--driver-memory
4g
--executor-cores
2
--num-executors
32
\
../
${
JAR
}
-outputadxdevtag
${
OUTPUT_ADN_ADX_DEVICE_TAG_PATH
}
\
-coalesce
80
\
-today
${
dt_today
}
-yesterday
${
dt_yesterday
}
if
[
$?
-ne
0
]
;
then
...
...
azkaban/adn_adx/adn_tencent_adx_package.sh
View file @
edf2bf9d
...
...
@@ -34,6 +34,7 @@ spark-submit --class mobvista.dmp.datasource.adn_adx.AdnTecentAdxDataMidWay \
--conf
spark.default.parallelism
=
1000
\
--conf
spark.sql.autoBroadcastJoinThreshold
=
31457280
\
--conf
spark.executor.extraJavaOptions
=
"-XX:+UseG1GC"
\
--conf
spark.sql.hive.verifyPartitionPath
=
true
\
--jars
s3://mob-emr-test/dataplatform/DataWareHouse/offline/myjar/hive-hcatalog-core-2.3.3.jar,s3://mob-emr-test/dataplatform/DataWareHouse/offline/myjar/json-serde-1.3.7-jar-with-dependencies.jar
\
--master
yarn
--deploy-mode
cluster
--name
AdnTecentAdxDataMidWay
--executor-memory
10g
--driver-memory
4g
--executor-cores
5
--num-executors
100
\
../
${
JAR
}
-outputadxtmp
${
OUTPUT_ODS_ADX_TMP_PATH
}
-dimadxpkg
${
OUTPUT_DIM_ADN_ADX_PKG_PATH
}
\
...
...
azkaban/dm/dmp_device_interest.sh
View file @
edf2bf9d
...
...
@@ -16,7 +16,9 @@ DMP_INSTALL_LIST_PATH="${DMP_INSTALL_LIST}/${date_path}/14days"
check_await
${
DMP_INSTALL_LIST_PATH
}
/_SUCCESS
sleep
60
app_tag_path
=
"
${
APP_TAG_PATH
}
/
${
date_path
}
"
check_await
"
${
app_tag_path
}
/_SUCCESS"
OUTPUT_PATH
=
"
${
DMP_INTEREST_PATH
}
/
${
date_path
}
"
...
...
@@ -27,30 +29,29 @@ expire_date_path=$(date +"%Y/%m/%d" -d "-10 day ${LOG_TIME}")
EXPIRE_OUTPUT_PATH
=
"
${
DMP_INTEREST_PATH
}
/
${
expire_date_path
}
"
spark-submit
--class
mobvista.dmp.datasource.dm.DmpDeviceInterest
\
--name
"DmpDeviceInterest.
${
date
}
"
\
--conf
spark.sql.shuffle.partitions
=
10000
\
--conf
spark.default.parallelism
=
1000
\
--conf
spark.kryoserializer.buffer.max
=
512m
\
--conf
spark.kryoserializer.buffer
=
64m
\
--conf
spark.sql.adaptive.enabled
=
true
\
--conf
spark.sql.adaptive.advisoryPartitionSizeInBytes
=
536870912
\
--master
yarn
--deploy-mode
cluster
--executor-memory
10g
--driver-memory
4g
--executor-cores
5
--num-executors
80
\
../
${
JAR
}
\
-date
${
date
}
-output
${
OUTPUT_PATH
}
-coalesce
4000
--name
"DmpDeviceInterest.
${
date
}
"
\
--conf
spark.sql.shuffle.partitions
=
10000
\
--conf
spark.default.parallelism
=
1000
\
--conf
spark.kryoserializer.buffer.max
=
512m
\
--conf
spark.kryoserializer.buffer
=
64m
\
--conf
spark.sql.adaptive.enabled
=
true
\
--conf
spark.sql.adaptive.advisoryPartitionSizeInBytes
=
536870912
\
--master
yarn
--deploy-mode
cluster
--executor-memory
10g
--driver-memory
4g
--executor-cores
5
--num-executors
80
\
../
${
JAR
}
\
-date
${
date
}
-output
${
OUTPUT_PATH
}
-coalesce
4000
if
[[
$?
-ne
0
]]
;
then
exit
255
exit
255
fi
part_num
=
`
hadoop fs
-ls
${
OUTPUT_PATH
}
|wc
-l
`
part_num
=
$(
hadoop fs
-ls
${
OUTPUT_PATH
}
|
wc
-l
)
if
[[
${
part_num
}
-le
1000
]]
then
echo
"This Dir No Data, Pleasce Check Job !!!"
exit
255
if
[[
${
part_num
}
-le
1000
]]
;
then
echo
"This Dir No Data, Pleasce Check Job !!!"
exit
255
fi
mount_partition
"dmp_interest_tag"
"dt='
${
date
}
'"
"
$OUTPUT_PATH
"
# 删除过期的分区及删除对应路径
unmount_partition
"dmp_interest_tag"
"dt='
${
expire_date
}
'"
"
${
EXPIRE_OUTPUT_PATH
}
"
\ No newline at end of file
unmount_partition
"dmp_interest_tag"
"dt='
${
expire_date
}
'"
"
${
EXPIRE_OUTPUT_PATH
}
"
src/main/scala/mobvista/dmp/datasource/adn_request_sdk/AdnRequestSdkEtlDaily.scala
View file @
edf2bf9d
...
...
@@ -73,7 +73,7 @@ class AdnRequestSdkEtlDaily extends CommonSparkJob with java.io.Serializable {
| WHERE CONCAT(yt,mt,dt) = '$date'
|"""
.
stripMargin
val
rdd
=
spark
.
sql
(
sql
).
coalesce
(
coalesce
.
toInt
).
rdd
.
map
(
row
=>
{
val
rdd
=
spark
.
sql
(
sql
).
coalesce
(
coalesce
*
2
).
rdd
.
map
(
row
=>
{
val
linesArr
=
new
ArrayBuffer
[
Row
]()
val
gaid
=
row
.
getAs
[
String
](
"gaid"
)
val
idfa
=
row
.
getAs
[
String
](
"idfa"
)
...
...
@@ -92,81 +92,85 @@ class AdnRequestSdkEtlDaily extends CommonSparkJob with java.io.Serializable {
val
osVersion
=
row
.
getAs
[
String
](
"osversion"
)
val
region
=
row
.
getAs
[
String
](
"rg"
)
var
flag
=
true
if
(
StringUtils
.
isNotBlank
(
idfa
)
&&
idfa
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
idfa
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
)
&&
"ios"
.
equals
(
platform
))
{
var
dev_tag
=
0
if
(
flag
)
{
dev_tag
=
1
flag
=
false
}
linesArr
+=
Row
(
idfa
,
"idfa"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
if
(
StringUtils
.
isNotBlank
(
gaid
)
&&
gaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
gaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
)
&&
"android"
.
equals
(
platform
))
{
var
dev_tag
=
0
if
(
flag
)
{
dev_tag
=
1
flag
=
false
}
linesArr
+=
Row
(
gaid
,
"gaid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
if
(
StringUtils
.
isNotBlank
(
imei
)
&&
imei
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
imeiPtn
)
&&
"android"
.
equals
(
platform
))
{
var
dev_tag
=
0
if
(
flag
)
{
dev_tag
=
1
flag
=
false
}
linesArr
+=
Row
(
imei
,
"imei"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
if
(
StringUtils
.
isNotBlank
(
androidId
)
&&
androidId
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
andriodIdPtn
)
&&
"android"
.
equals
(
platform
))
{
var
dev_tag
=
0
if
(
flag
)
{
dev_tag
=
1
flag
=
false
}
linesArr
+=
Row
(
androidId
,
"androidId"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
// 新增自有Id
val
sysIdType
=
GetDevIdUtil
.
getExtSysId
(
extSysid
)
if
(
StringUtils
.
isNotBlank
(
sysIdType
))
{
val
idType
=
MRUtils
.
SPLITTER
.
split
(
sysIdType
,
-
1
)
var
dev_tag
=
0
if
(
flag
)
{
dev_tag
=
1
flag
=
false
}
linesArr
+=
Row
(
idType
(
0
),
idType
(
1
),
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
if
(
StringUtils
.
isNotBlank
(
oaid
)
&&
oaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
oaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
))
{
var
dev_tag
=
0
if
(
flag
)
{
dev_tag
=
1
flag
=
false
}
linesArr
+=
Row
(
oaid
,
"oaid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
val
sysId
=
if
(
StringUtils
.
isNotBlank
(
sysIdType
))
{
val
idType
=
MRUtils
.
SPLITTER
.
split
(
sysIdType
,
-
1
)
idType
(
0
)
}
else
{
""
}
if
(
StringUtils
.
isNotBlank
(
idfv
)
&&
idfv
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
idfv
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
)
&&
"ios"
.
equals
(
platform
))
{
var
dev_tag
=
0
if
(
flag
)
{
dev_tag
=
1
flag
=
false
}
linesArr
+=
Row
(
idfv
,
"idfv"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
platform
match
{
case
"ios"
=>
var
dev_tag
=
1
if
(
StringUtils
.
isNotBlank
(
ruid
)
&&
ruid
.
length
>
16
)
{
linesArr
+=
Row
(
ruid
,
"ruid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
1
)
}
if
(
StringUtils
.
isNotBlank
(
idfa
)
&&
idfa
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
idfa
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
))
{
linesArr
+=
Row
(
idfa
,
"idfa"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
if
(
StringUtils
.
isNotBlank
(
sysId
))
{
linesArr
+=
Row
(
sysId
,
"sysid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
dev_tag
=
0
if
(
StringUtils
.
isNotBlank
(
idfv
)
&&
idfv
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
idfv
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
))
{
linesArr
+=
Row
(
idfv
,
"idfv"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
}
else
{
if
(
StringUtils
.
isNotBlank
(
sysId
))
{
linesArr
+=
Row
(
sysId
,
"sysid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
if
(
StringUtils
.
isNotBlank
(
idfv
)
&&
idfv
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
idfv
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
))
{
linesArr
+=
Row
(
idfv
,
"idfv"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
}
else
{
if
(
StringUtils
.
isNotBlank
(
idfv
)
&&
idfv
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
idfv
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
))
{
linesArr
+=
Row
(
idfv
,
"idfv"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
}
}
case
"android"
=>
var
dev_tag
=
1
if
(
StringUtils
.
isNotBlank
(
gaid
)
&&
gaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
gaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
))
{
linesArr
+=
Row
(
gaid
,
"gaid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
if
(
StringUtils
.
isNotBlank
(
oaid
)
&&
oaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
oaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
))
{
linesArr
+=
Row
(
oaid
,
"oaid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
dev_tag
=
0
if
(
StringUtils
.
isNotBlank
(
sysId
))
{
linesArr
+=
Row
(
sysId
,
"sysid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
if
(
StringUtils
.
isNotBlank
(
imei
)
&&
imei
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
imeiPtn
))
{
linesArr
+=
Row
(
imei
,
"imei"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
if
(
StringUtils
.
isNotBlank
(
androidId
)
&&
androidId
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
andriodIdPtn
))
{
linesArr
+=
Row
(
androidId
,
"androidId"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
}
else
{
if
(
StringUtils
.
isNotBlank
(
oaid
)
&&
oaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
didPtn
)
&&
!
oaid
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
allZero
))
{
linesArr
+=
Row
(
oaid
,
"oaid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
if
(
StringUtils
.
isNotBlank
(
imei
)
&&
imei
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
imeiPtn
)
&&
"android"
.
equals
(
platform
))
{
if
(
dev_tag
==
1
)
{
dev_tag
=
0
}
linesArr
+=
Row
(
imei
,
"imei"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
if
(
StringUtils
.
isNotBlank
(
androidId
)
&&
androidId
.
matches
(
mobvista
.
dmp
.
common
.
MobvistaConstant
.
andriodIdPtn
)
&&
"android"
.
equals
(
platform
))
{
if
(
dev_tag
==
1
)
{
dev_tag
=
0
}
linesArr
+=
Row
(
androidId
,
"androidId"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
if
(
StringUtils
.
isNotBlank
(
sysId
))
{
if
(
dev_tag
==
1
)
{
dev_tag
=
0
}
linesArr
+=
Row
(
sysId
,
"sysid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
}
case
_
=>
}
if
(
StringUtils
.
isNotBlank
(
ruid
)
&&
ruid
.
length
>
16
&&
"ios"
.
equals
(
platform
))
{
var
dev_tag
=
0
if
(
flag
)
{
dev_tag
=
1
flag
=
false
}
linesArr
+=
Row
(
ruid
,
"ruid"
,
platform
,
appId
,
model
,
brand
,
osVersion
,
country
,
strategy
,
region
,
dev_tag
)
}
linesArr
}).
flatMap
(
l
=>
l
)
...
...
src/main/scala/mobvista/dmp/main/BundleMatchMain.scala
View file @
edf2bf9d
package
mobvista.dmp.main
import
java.util
import
com.fasterxml.jackson.databind.ObjectMapper
import
mobvista.dmp.main.Constant._
import
mobvista.dmp.util.
{
DateUtil
,
MRUtils
}
...
...
@@ -10,6 +8,7 @@ import org.apache.spark.broadcast.Broadcast
import
org.apache.spark.sql.functions._
import
org.apache.spark.sql.
{
DataFrame
,
SparkSession
}
import
java.util
import
scala.collection.mutable
class
BundleMatchMain
extends
BundleMatchNewJob
{
...
...
@@ -50,9 +49,8 @@ class BundleMatchMain extends BundleMatchNewJob {
case
"adn_request_sdk"
=>
val
df
=
spark
.
read
.
orc
(
oldUnmatch
)
.
filter
(
r
=>
{
val
packageName
=
r
.
getAs
[
String
](
"package_name"
)
r
.
getAs
[
String
](
"update_date"
).
compareTo
(
expire_date
)
>
0
&&
StringUtils
.
isNotBlank
(
packageName
)
&&
!
packageName
.
equals
(
"0000000000"
)
&&
!
packageName
.
equals
(
"com.nonepkg.nonepkg"
)
// val packageName = r.getAs[String]("package_name")
r
.
getAs
[
String
](
"update_date"
).
compareTo
(
expire_date
)
>
0
}).
toDF
()
.
union
(
spark
.
read
.
orc
(
input
)
.
withColumn
(
"package_name"
,
getPkgName
(
col
(
"platform"
),
col
(
"package_name"
)))
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment