Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mobvista-dmp
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
王金锋
mobvista-dmp
Commits
228e128a
Commit
228e128a
authored
Dec 21, 2021
by
WangJinfeng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
init id_mapping
parent
16b54f6b
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
24 additions
and
5 deletions
+24
-5
IDMappingGraphx.scala
.../mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala
+24
-5
No files found.
src/main/scala/mobvista/dmp/datasource/id_mapping/IDMappingGraphx.scala
View file @
228e128a
...
...
@@ -12,6 +12,7 @@ import org.apache.spark.sql.types.StructType
import
org.apache.spark.sql.
{
Row
,
SparkSession
}
import
java.net.URI
import
scala.collection.JavaConverters._
import
scala.collection.mutable.ArrayBuffer
/**
...
...
@@ -123,12 +124,30 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
idType
=
it
.
_3
oneID
.
put
(
it
.
_1
,
MobvistaConstant
.
String2JSONObject
(
it
.
_2
))
})
(
srcID
,
oneID
.
toJSONString
,
idType
)
((
srcID
,
idType
),
oneID
.
toJSONString
)
})
val
mergeOneIDRDD
=
multiOneIDRDD
.
union
(
singleOneIDRDD
).
combineByKey
(
(
v
:
String
)
=>
Set
(
v
),
(
c
:
Set
[
String
],
v
:
String
)
=>
c
++
Seq
(
v
),
(
c1
:
Set
[
String
],
c2
:
Set
[
String
])
=>
c1
++
c2
).
map
(
kv
=>
{
val
srcId
=
kv
.
_1
.
_1
val
srcType
=
kv
.
_1
.
_2
val
oneID
=
new
JSONObject
()
kv
.
_2
.
foreach
(
js
=>
{
val
json
=
MobvistaConstant
.
String2JSONObject
(
js
)
val
keys
=
json
.
keySet
().
asScala
keys
.
foreach
(
key
=>
{
oneID
.
put
(
key
,
json
.
getJSONObject
(
key
))
})
})
(
srcId
,
srcType
,
oneID
)
})
FileSystem
.
get
(
new
URI
(
s
"s3://mob-emr-test"
),
spark
.
sparkContext
.
hadoopConfiguration
).
delete
(
new
Path
(
outPutPath
),
true
)
m
ultiOneIDRDD
.
union
(
singleOneIDRDD
)
m
ergeOneIDRDD
.
repartition
(
coalesce
)
.
saveAsTextFile
(
outPutPath
,
classOf
[
GzipCodec
])
}
...
...
@@ -263,8 +282,8 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
array
}
def
updateOneID
(
kv
:
(
String
,
Set
[(
String
,
String
,
String
)]),
mainIDSet
:
Set
[
String
])
:
ArrayBuffer
[(
String
,
String
,
String
)]
=
{
val
array
=
new
ArrayBuffer
[(
String
,
String
,
String
)]()
def
updateOneID
(
kv
:
(
String
,
Set
[(
String
,
String
,
String
)]),
mainIDSet
:
Set
[
String
])
:
ArrayBuffer
[(
(
String
,
String
)
,
String
)]
=
{
val
array
=
new
ArrayBuffer
[(
(
String
,
String
)
,
String
)]()
val
tmpOneId
=
kv
.
_1
val
iters
=
kv
.
_2
val
oneID
=
new
JSONObject
()
...
...
@@ -278,7 +297,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}).
foreach
(
itr
=>
{
val
k
=
itr
.
_1
val
t
=
itr
.
_3
array
+=
((
k
,
oneID
.
toJSONString
,
t
))
array
+=
((
(
k
,
t
),
oneID
.
toJSONString
))
})
array
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment