Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mobvista-dmp
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
王金锋
mobvista-dmp
Commits
d0d9e1ad
Commit
d0d9e1ad
authored
Dec 20, 2021
by
WangJinfeng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
init id_mapping
parent
b4a41d2c
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
20 additions
and
19 deletions
+20
-19
IDMappingGraphx.scala
.../mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala
+20
-19
No files found.
src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala
View file @
d0d9e1ad
...
@@ -95,9 +95,9 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
...
@@ -95,9 +95,9 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}).
flatMap
(
l
=>
l
)
}).
flatMap
(
l
=>
l
)
val
maxGraph
=
vertex
.
combineByKey
(
val
maxGraph
=
vertex
.
combineByKey
(
(
v
:
(
String
,
JSONObject
,
String
))
=>
Set
(
v
),
(
v
:
(
String
,
String
,
String
))
=>
Set
(
v
),
(
c
:
Set
[(
String
,
JSONObject
,
String
)],
v
:
(
String
,
JSONObject
,
String
))
=>
c
++
Seq
(
v
),
(
c
:
Set
[(
String
,
String
,
String
)],
v
:
(
String
,
String
,
String
))
=>
c
++
Seq
(
v
),
(
c1
:
Set
[(
String
,
JSONObject
,
String
)],
c2
:
Set
[(
String
,
JSONObject
,
String
)])
=>
c1
++
c2
(
c1
:
Set
[(
String
,
String
,
String
)],
c2
:
Set
[(
String
,
String
,
String
)])
=>
c1
++
c2
)
)
val
multiOneIDRDD
=
maxGraph
.
filter
(
kv
=>
{
val
multiOneIDRDD
=
maxGraph
.
filter
(
kv
=>
{
...
@@ -119,9 +119,9 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
...
@@ -119,9 +119,9 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
var
idType
=
""
var
idType
=
""
kv
.
_2
.
foreach
(
it
=>
{
kv
.
_2
.
foreach
(
it
=>
{
idType
=
it
.
_3
idType
=
it
.
_3
oneID
.
put
(
it
.
_1
,
it
.
_2
)
oneID
.
put
(
it
.
_1
,
MobvistaConstant
.
String2JSONObject
(
it
.
_2
)
)
})
})
(
srcID
,
oneID
,
idType
)
(
srcID
,
oneID
.
toJSONString
,
idType
)
})
})
FileSystem
.
get
(
new
URI
(
s
"s3://mob-emr-test"
),
spark
.
sparkContext
.
hadoopConfiguration
).
delete
(
new
Path
(
outPutPath
),
true
)
FileSystem
.
get
(
new
URI
(
s
"s3://mob-emr-test"
),
spark
.
sparkContext
.
hadoopConfiguration
).
delete
(
new
Path
(
outPutPath
),
true
)
...
@@ -161,7 +161,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
...
@@ -161,7 +161,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val
osv_upt
=
row
.
getAs
[
String
](
"osv_upt"
)
val
osv_upt
=
row
.
getAs
[
String
](
"osv_upt"
)
val
upt
=
row
.
getAs
[
String
](
"upt"
)
val
upt
=
row
.
getAs
[
String
](
"upt"
)
val
cnt
=
row
.
getAs
[
Long
](
"cnt"
)
val
cnt
=
row
.
getAs
[
Long
](
"cnt"
)
val
idfv_bundle
=
if
(
StringUtils
.
isNotBlank
(
idfv
)
&&
StringUtils
.
isNotBlank
(
pkg_name
)
)
{
val
idfv_bundle
=
if
(
StringUtils
.
isNotBlank
(
idfv
))
{
MD5Util
.
getMD5Str
(
idfv
+
pkg_name
)
MD5Util
.
getMD5Str
(
idfv
+
pkg_name
)
}
else
{
}
else
{
""
""
...
@@ -176,7 +176,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
...
@@ -176,7 +176,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}
else
{
}
else
{
""
""
}
}
val
bmosv_ipua_bundle
=
if
(
StringUtils
.
isNotBlank
(
ip
)
&&
StringUtils
.
isNotBlank
(
pkg_name
)
)
{
val
bmosv_ipua_bundle
=
if
(
StringUtils
.
isNotBlank
(
ip
))
{
MD5Util
.
getMD5Str
(
brand
+
model
+
os_version
+
ip
+
ua
+
pkg_name
)
MD5Util
.
getMD5Str
(
brand
+
model
+
os_version
+
ip
+
ua
+
pkg_name
)
}
else
{
}
else
{
""
""
...
@@ -201,7 +201,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
...
@@ -201,7 +201,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val
os_version
=
row
.
getAs
[
String
](
"os_version"
)
val
os_version
=
row
.
getAs
[
String
](
"os_version"
)
val
upt
=
row
.
getAs
[
String
](
"upt"
)
val
upt
=
row
.
getAs
[
String
](
"upt"
)
val
cnt
=
row
.
getAs
[
Long
](
"cnt"
)
val
cnt
=
row
.
getAs
[
Long
](
"cnt"
)
val
android_pkg
=
if
(
StringUtils
.
isNotBlank
(
android_id
)
&&
StringUtils
.
isNotBlank
(
pkg_name
)
)
{
val
android_pkg
=
if
(
StringUtils
.
isNotBlank
(
android_id
))
{
MD5Util
.
getMD5Str
(
android_id
+
pkg_name
)
MD5Util
.
getMD5Str
(
android_id
+
pkg_name
)
}
else
{
}
else
{
""
""
...
@@ -211,7 +211,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
...
@@ -211,7 +211,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}
else
{
}
else
{
""
""
}
}
val
bmosv_ipua_pkg
=
if
(
StringUtils
.
isNotBlank
(
ip
)
&&
StringUtils
.
isNotBlank
(
pkg_name
)
)
{
val
bmosv_ipua_pkg
=
if
(
StringUtils
.
isNotBlank
(
ip
))
{
MD5Util
.
getMD5Str
(
brand
+
model
+
os_version
+
ip
+
ua
+
pkg_name
)
MD5Util
.
getMD5Str
(
brand
+
model
+
os_version
+
ip
+
ua
+
pkg_name
)
}
else
{
}
else
{
""
""
...
@@ -227,8 +227,8 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
...
@@ -227,8 +227,8 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}
}
}
}
def
processVertex
(
date
:
String
,
row
:
Row
,
ids
:
Array
[
String
],
mainIDSet
:
Set
[
String
])
:
ArrayBuffer
[(
String
,
(
String
,
JSONObject
,
String
))]
=
{
def
processVertex
(
date
:
String
,
row
:
Row
,
ids
:
Array
[
String
],
mainIDSet
:
Set
[
String
])
:
ArrayBuffer
[(
String
,
(
String
,
String
,
String
))]
=
{
val
array
=
new
ArrayBuffer
[(
String
,
(
String
,
JSONObject
,
String
))]()
val
array
=
new
ArrayBuffer
[(
String
,
(
String
,
String
,
String
))]()
implicit
val
formats
=
org
.
json4s
.
DefaultFormats
implicit
val
formats
=
org
.
json4s
.
DefaultFormats
// val json = JSON.parseObject(Serialization.write(row))
// val json = JSON.parseObject(Serialization.write(row))
// 事件频次
// 事件频次
...
@@ -243,15 +243,15 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
...
@@ -243,15 +243,15 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
jsonObject
.
put
(
"active_type"
,
date
)
jsonObject
.
put
(
"active_type"
,
date
)
jsonObject
.
put
(
"cnt"
,
cnt
)
jsonObject
.
put
(
"cnt"
,
cnt
)
val
oneID
=
row
.
getAs
[
String
](
String
.
valueOf
(
ids
(
i
)))
val
oneID
=
row
.
getAs
[
String
](
String
.
valueOf
(
ids
(
i
)))
array
+=
((
oneID
,
(
oneID
,
jsonObject
,
oneIDType
)))
array
+=
((
oneID
,
(
oneID
,
jsonObject
.
toJSONString
,
oneIDType
)))
for
(
j
<-
i
+
1
until
ids
.
length
)
{
for
(
j
<-
i
+
1
until
ids
.
length
)
{
if
(
StringUtils
.
isNotBlank
(
row
.
getAs
[
String
](
String
.
valueOf
(
ids
(
j
)))))
{
if
(
StringUtils
.
isNotBlank
(
row
.
getAs
[
String
](
String
.
valueOf
(
ids
(
j
)))))
{
val
srcType
=
ids
(
j
)
val
srcType
=
ids
(
j
)
val
srcOrg
=
row
.
getAs
[
String
](
String
.
valueOf
(
srcType
))
val
srcOrg
=
row
.
getAs
[
String
](
String
.
valueOf
(
srcType
))
if
(
mainIDSet
.
contains
(
oneIDType
))
{
if
(
mainIDSet
.
contains
(
oneIDType
))
{
array
+=
((
srcOrg
,
(
oneID
,
jsonObject
,
srcType
)))
array
+=
((
srcOrg
,
(
oneID
,
jsonObject
.
toJSONString
,
srcType
)))
}
else
{
}
else
{
array
+=
((
oneID
,
(
srcOrg
,
jsonObject
,
srcType
)))
array
+=
((
oneID
,
(
srcOrg
,
jsonObject
.
toJSONString
,
srcType
)))
}
}
}
}
}
}
...
@@ -261,18 +261,19 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
...
@@ -261,18 +261,19 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
array
array
}
}
def
updateOneID
(
kv
:
(
String
,
Iterable
[(
String
,
JSONObject
,
String
)]),
mainIDSet
:
Set
[
String
])
:
ArrayBuffer
[(
String
,
JSONObject
,
String
)]
=
{
def
updateOneID
(
kv
:
(
String
,
Set
[(
String
,
String
,
String
)]),
mainIDSet
:
Set
[
String
])
:
ArrayBuffer
[(
String
,
String
,
String
)]
=
{
val
array
=
new
ArrayBuffer
[(
String
,
JSONObject
,
String
)]()
val
array
=
new
ArrayBuffer
[(
String
,
String
,
String
)]()
val
iters
=
kv
.
_2
val
iters
=
kv
.
_2
val
oneID
=
new
JSONObject
()
val
oneID
=
new
JSONObject
()
iters
.
foreach
(
ir
=>
{
iters
.
foreach
(
ir
=>
{
oneID
.
put
(
ir
.
_1
,
ir
.
_2
)
oneID
.
put
(
ir
.
_1
,
MobvistaConstant
.
String2JSONObject
(
ir
.
_2
)
)
})
})
iters
.
filter
(
tp
=>
{
iters
.
filter
(
tp
=>
{
!
mainIDSet
.
contains
(
tp
.
_2
.
getString
(
"id_type"
))
!
mainIDSet
.
contains
(
MobvistaConstant
.
String2JSONObject
(
tp
.
_2
)
.
getString
(
"id_type"
))
}).
foreach
(
itr
=>
{
}).
foreach
(
itr
=>
{
val
k
=
itr
.
_1
val
k
=
itr
.
_1
array
+=
((
k
,
oneID
,
itr
.
_3
))
val
t
=
itr
.
_3
array
+=
((
k
,
oneID
.
toJSONString
,
t
))
})
})
array
array
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment