Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mobvista-dmp
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
王金锋
mobvista-dmp
Commits
1ddd357c
Commit
1ddd357c
authored
Oct 01, 2021
by
WangJinfeng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix
parent
9d6e95e0
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
34 additions
and
15 deletions
+34
-15
dmp_env.sh
azkaban/dmp_env.sh
+9
-2
tools.sh
azkaban/ga_rawdata_analysis/common/tools.sh
+1
-1
dm_realtime_service.sh
azkaban/realtime/dm_realtime_service.sh
+5
-0
rtdmp_merge_ck.sh
azkaban/rtdmp/rtdmp_merge_ck.sh
+1
-1
clickhouse.sql
doc/ck/clickhouse.sql
+6
-0
RTDmpFetch.java
src/main/java/mobvista/dmp/datasource/rtdmp/RTDmpFetch.java
+5
-5
MySQLUtil.java
src/main/java/mobvista/dmp/util/MySQLUtil.java
+4
-3
Constant.scala
src/main/scala/mobvista/dmp/output/reyun/Constant.scala
+3
-3
No files found.
azkaban/dmp_env.sh
View file @
1ddd357c
...
...
@@ -1036,4 +1036,12 @@ check_await_hive_partition() {
# s3://mob-emr-test/wangjf/jar/spark-clickhouse-connector_2.11-2.4.0_0.22.jar,\
# s3://mob-emr-test/wangjf/jar/cassandra-driver-core-3.10.2.jar,\
# s3://mob-emr-test/wangjf/jar/spark-cassandra-connector_2.11-2.5.1.jar,\
# s3://mob-emr-test/wangjf/jar/spark-cassandra-connector-driver_2.11-2.5.1.jar"
\ No newline at end of file
# s3://mob-emr-test/wangjf/jar/spark-cassandra-connector-driver_2.11-2.5.1.jar"
export
SPARK_HOME
=
"/data/hadoop-home/spark-3.1.1-bin-free-c59d19df39"
export
SPARK_CONF_DIR
=
"/data/hadoop-config/command-home/engineplus-k8s-spark-3.1.1-offline/conf"
export
JAVA_HOME
=
"/usr/lib/jvm/jdk1.8.0_131"
export
HIVE_CONF_DIR
=
"/data/hadoop-config/command-home/apache-hive-2.3.3-offline/conf"
azkaban/ga_rawdata_analysis/common/tools.sh
View file @
1ddd357c
...
...
@@ -5,7 +5,7 @@ user="andy.liu"
map_memory
=
"mapreduce.map.memory.mb=2048"
reduce_memory
=
"mapreduce.reduce.memory.mb=3072"
export
HIVE_CONF_DIR
=
/data/azkaban-hadoop/command-home/hive-offline/conf
#
export HIVE_CONF_DIR=/data/azkaban-hadoop/command-home/hive-offline/conf
function
hive_func
(){
...
...
azkaban/realtime/dm_realtime_service.sh
View file @
1ddd357c
...
...
@@ -26,8 +26,13 @@ sleep 30
output_path
=
"s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/
${
date_path
}
"
unmount_output_path
=
"s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/
${
unmount_date_path
}
"
export
SPARK_HOME
=
"/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2"
export
SPARK_CONF_DIR
=
"/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf"
spark-submit
--class
mobvista.dmp.datasource.retargeting.DeviceInfoJob
\
--name
"DeviceInfoJob.wangjf.
${
date
}
"
\
--conf
spark.sql.broadcastTimeout
=
1200
\
--conf
spark.sql.shuffle.partitions
=
5000
\
--conf
spark.default.parallelism
=
5000
\
--conf
spark.kryoserializer.buffer.max
=
512m
\
...
...
azkaban/rtdmp/rtdmp_merge_ck.sh
View file @
1ddd357c
...
...
@@ -9,7 +9,7 @@ date_time=$(date +"%Y%m%d%H" -d "-1 hour $today")
host
=
"ip-172-31-20-35.ec2.internal"
cluster
=
"cluster_1st"
database
=
"dwh"
table
=
"audience_merge"
table
=
"audience_merge
_v1
"
spark-submit
--class
mobvista.dmp.datasource.rtdmp.RTDmpMergeCK
\
--name
"RTDmpMergeCK.wangjf.
${
date_time
}
"
\
...
...
doc/ck/clickhouse.sql
View file @
1ddd357c
...
...
@@ -124,6 +124,12 @@ CREATE TABLE dwh.etl_iqiyi_install_daily_all ON CLUSTER cluster_1st (`dt` Date,
CREATE
TABLE
dwh
.
audience_merge
ON
CLUSTER
cluster_1st
(
dt
Date
,
hour
FixedString
(
2
),
devid
String
,
audience_id
Array
(
Int32
))
ENGINE
=
ReplicatedMergeTree
(
'/clickhouse/tables/{layer}-{shard}/audience_merge'
,
'{replica}'
)
PARTITION
BY
(
toYYYYMMDD
(
dt
),
hour
)
ORDER
BY
(
dt
,
hour
,
devid
)
SETTINGS
index_granularity
=
8192
;
CREATE
TABLE
dwh
.
audience_merge_all
ON
CLUSTER
cluster_1st
(
dt
Date
,
hour
FixedString
(
2
),
devid
String
,
audience_id
Array
(
Int32
))
ENGINE
=
Distributed
(
cluster_1st
,
dwh
,
audience_merge
,
rand
());
DROP
TABLE
dwh
.
audience_merge
ON
CLUSTER
cluster_1st
;
CREATE
TABLE
dwh
.
audience_merge_v1
(
`dt`
Date
,
`hour`
FixedString
(
2
),
`devid`
String
,
`audience_id`
Array
(
Int32
),
`device_type`
String
DEFAULT
''
)
ENGINE
=
ReplicatedMergeTree
(
'/clickhouse/tables/{layer}-{shard}/audience_merge_v1'
,
'{replica}'
)
PARTITION
BY
(
toYYYYMMDD
(
dt
),
hour
)
ORDER
BY
(
dt
,
hour
,
devid
)
TTL
dt
+
toIntervalDay
(
2
)
SETTINGS
index_granularity
=
8192
,
use_minimalistic_part_header_in_zookeeper
=
1
;
CREATE
TABLE
dwh
.
audience_merge_v1_all
(
`dt`
Date
,
`hour`
FixedString
(
2
),
`devid`
String
,
`audience_id`
Array
(
Int32
),
`device_type`
String
)
ENGINE
=
Distributed
(
'cluster_1st'
,
'dwh'
,
'audience_merge_v1'
,
rand
());
CREATE
TABLE
dwh
.
audience_merge
(
dt
Date
,
hour
FixedString
(
2
),
devid
String
,
audience_id
Array
(
Int32
))
ENGINE
=
MergeTree
()
PARTITION
BY
(
toYYYYMMDD
(
dt
),
hour
)
ORDER
BY
(
dt
,
hour
,
devid
)
SETTINGS
index_granularity
=
8192
;
CREATE
TABLE
dmp
.
uc_lahuo_daily
ON
CLUSTER
cluster_1st
(
dt
Date
,
device_type
String
,
device_ids
String
)
ENGINE
=
ReplicatedMergeTree
(
'/clickhouse/tables/{layer}-{shard}/uc_lahuo_daily'
,
'{replica}'
)
PARTITION
BY
(
toYYYYMMDD
(
dt
),
device_type
)
ORDER
BY
(
dt
,
device_type
,
device_ids
)
TTL
dt
+
toIntervalWeek
(
1
)
SETTINGS
index_granularity
=
8192
;
...
...
src/main/java/mobvista/dmp/datasource/rtdmp/RTDmpFetch.java
View file @
1ddd357c
...
...
@@ -121,7 +121,7 @@ public class RTDmpFetch {
LOGGER
.
info
(
"checkRules -->> audienceId:"
+
audienceId
+
", jsonObject:"
+
jsonObject
+
", startTime:"
+
startTime
+
", endTime:"
+
endTime
);
Tuple
tuple
=
checkRules
(
jsonObject
,
startTime
,
endTime
);
if
(
tuple
.
getFlag
())
{
KV
kv
=
mySqlUtil
.
getPartitionTime
(
"dwh"
,
"
audience_merge
"
);
KV
kv
=
mySqlUtil
.
getPartitionTime
(
"dwh"
,
"
audience_merge_v1
"
);
String
dt
=
DateUtil
.
format
(
DateUtil
.
parse
(
kv
.
getK
().
substring
(
0
,
8
),
"yyyyMMdd"
),
"yyyy-MM-dd"
);
String
hour
=
kv
.
getK
().
substring
(
8
,
10
);
// time of insert partition
...
...
@@ -144,7 +144,7 @@ public class RTDmpFetch {
}
catch
(
InterruptedException
e
)
{
LOGGER
.
info
(
e
.
getMessage
());
}
kv
=
mySqlUtil
.
getPartitionTime
(
"dwh"
,
"
audience_merge
"
);
kv
=
mySqlUtil
.
getPartitionTime
(
"dwh"
,
"
audience_merge_v1
"
);
dt
=
DateUtil
.
format
(
DateUtil
.
parse
(
kv
.
getK
().
substring
(
0
,
8
),
"yyyyMMdd"
),
"yyyy-MM-dd"
);
hour
=
kv
.
getK
().
substring
(
8
,
10
);
utime
=
kv
.
getV
();
...
...
@@ -175,7 +175,7 @@ public class RTDmpFetch {
String
sql
=
buildSql
(
dt
,
hour
,
jsonObject
)
.
replace
(
"@key"
,
"device_type"
)
.
replace
(
"@table"
,
"
audience_merge
_all"
)
+
" GROUP BY device_type"
;
.
replace
(
"@table"
,
"
audience_merge_v1
_all"
)
+
" GROUP BY device_type"
;
LOGGER
.
info
(
"checkDeviceType -->> audienceId:"
+
audienceId
+
", sql -->>"
+
sql
);
ClickHouseDataSource
dataSource0
=
new
ClickHouseDataSource
(
URL
.
replace
(
"host"
,
ips0
[
random0
]),
properties
);
...
...
@@ -202,7 +202,7 @@ public class RTDmpFetch {
sql
=
buildSql
(
dt
,
hour
,
jsonObject
)
.
replace
(
"@key"
,
"COUNT(1) counts"
)
.
replace
(
"@table"
,
"
audience_merge
_all"
);
.
replace
(
"@table"
,
"
audience_merge_v1
_all"
);
int
counts
=
0
;
try
{
connection0
=
dataSource0
.
getConnection
();
...
...
@@ -225,7 +225,7 @@ public class RTDmpFetch {
for
(
String
devType
:
devTypeSet
)
{
sql
=
buildSql
(
dt
,
hour
,
jsonObject
)
.
replace
(
"@key"
,
"devid"
)
.
replace
(
"@table"
,
"
audience_merge
"
)
+
" AND device_type = '"
+
devType
+
"'"
;
.
replace
(
"@table"
,
"
audience_merge_v1
"
)
+
" AND device_type = '"
+
devType
+
"'"
;
LOGGER
.
info
(
"checkDeviceId -->> audienceId:"
+
audienceId
+
",sql -->> "
+
sql
);
for
(
int
i
=
0
;
i
<
SET_VALUES
.
length
;
i
++)
{
...
...
src/main/java/mobvista/dmp/util/MySQLUtil.java
View file @
1ddd357c
...
...
@@ -140,13 +140,13 @@ public class MySQLUtil {
}
public
static
void
main
(
String
[]
args
)
{
KV
kv
=
getPartitionTime
(
"dwh"
,
"
audience_merge
"
);
KV
kv
=
getPartitionTime
(
"dwh"
,
"
audience_merge_v1
"
);
long
nowTime
=
DateUtil
.
parse
(
DateUtil
.
format
(
new
Date
(),
"yyyy-MM-dd HH:mm:ss"
),
"yyyy-MM-dd HH:mm:ss"
).
getTime
()
/
1000
;
System
.
out
.
println
(
kv
.
getV
());
System
.
out
.
println
(
Long
.
parseLong
(
kv
.
getV
())
>
nowTime
-
1200
);
// System.out.println(getLastPartition("dwh", "
audience_merge
"));
// System.out.println(update("dwh", "
audience_merge
", "2021072913"));
// System.out.println(getLastPartition("dwh", "
audience_merge_v1
"));
// System.out.println(update("dwh", "
audience_merge_v1
", "2021072913"));
}
}
\ No newline at end of file
src/main/scala/mobvista/dmp/output/reyun/Constant.scala
View file @
1ddd357c
...
...
@@ -12,7 +12,7 @@ object Constant {
"""
|SELECT created, app_id, creative_id, platform, os_version, device_brand, device_model, country_code, network_type, ip, imei,
| mac, dev_id, idfa, ext_packagename, ext_finalpackagename, ext_channel, ext_oaid, ext_advinstalltime, ext_eventtime,
| ext_campaignpackagename
| ext_campaignpackagename
, ext_deviceid
| FROM dwh.ods_adn_trackingnew_impression
| WHERE CONCAT(yyyy,mm,dd) = '@date' AND UPPER(country_code) = 'CN'
|"""
.
stripMargin
...
...
@@ -20,7 +20,7 @@ object Constant {
val
tracking_install
=
"""
|SELECT created, app_id, advertiser_id, creative_id, platform, os_version, device_brand, device_model, country_code, network_type,
| ip, imei, mac, dev_id, idfa, ext_campaignpackagename, ext_finalpackagename, ext_advinstalltime, ext_oaid, ext_eventtime
| ip, imei, mac, dev_id, idfa, ext_campaignpackagename, ext_finalpackagename, ext_advinstalltime, ext_oaid, ext_eventtime
, ext_deviceid
| FROM dwh.ods_adn_trackingnew_install
| WHERE CONCAT(yyyy,mm,dd) = '@date'
|"""
.
stripMargin
...
...
@@ -29,7 +29,7 @@ object Constant {
"""
|SELECT created, app_id, creative_id, platform, os_version, device_brand, device_model, country_code, network_type, ip, imei,
| mac, dev_id, idfa, ext_packagename, ext_finalpackagename, ext_channel, ext_oaid, ext_advinstalltime, ext_eventtime,
| ext_campaignpackagename
| ext_campaignpackagename
, ext_deviceid
| FROM dwh.ods_adn_trackingnew_click
| WHERE CONCAT(yyyy,mm,dd) = '@date'
|"""
.
stripMargin
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment