1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/bin/bash
# # # # # # # # # # # # # # # # # # # # # #
# @file : crawl_app_info.sh
# @author: houying
# @date : 16-11-3
# @desc : 抓取package信息,并合并到相关的信息表里
# # # # # # # # # # # # # # # # # # # # # #
source ../dmp_env.sh
LOG_TIME=$(date -d "$ScheduleTime 1 days ago" "+%Y%m%d")
year=${LOG_TIME:0:4}
month=${LOG_TIME:4:2}
day=${LOG_TIME:6:2}
YESTERDAY=$(date -d "$ScheduleTime 2 days ago" "+%Y%m%d")
old_year=${YESTERDAY:0:4}
old_month=${YESTERDAY:4:2}
old_day=${YESTERDAY:6:2}
PACKAGE_PATH="${PACKAGE_TMP_PATH}/${year}/${month}/${day}"
select_pkg_name() {
CREATE_TABLE_SQL="
drop table if exists tmp_package_name;
create external table if not exists tmp_package_name (
package_name string,
platform string
) ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '$PACKAGE_PATH'
"
# 上传到hdfs,计算与现有app_info的差集和信息过期的app_info
SEVEN_DAYS_AGO="$(date -d "$ScheduleTime 7 days ago" "+%Y%m%d")"
FOURTEEN_DAYS_AGO="$(date -d "$ScheduleTime 14 days ago" "+%Y%m%d")"
hive_cmd "
use dwh;
set hive.cli.print.header=false;
$CREATE_TABLE_SQL;
select t.package_name, t.platform
from (
select a.package_name, a.platform
from tmp_package_name a
left outer join (
select package_name, 'ios' as platform
from dim_app_info_ios
where year='$old_year'
and month='$old_month'
and day='$old_day'
union all
select package_name, 'adr' as platform
from dim_app_info_adr
where year='$old_year'
and month='$old_month'
and day='$old_day'
) b
on a.platform=b.platform
and a.package_name=b.package_name
where b.platform is null and b.package_name is null
union all
select package_name, 'ios' as platform
from dim_app_info_ios
where year='$old_year'
and month='$old_month'
and day='$old_day'
and update_time<='$SEVEN_DAYS_AGO'
and update_time>='$FOURTEEN_DAYS_AGO'
union all
select package_name, 'adr' as platform
from dim_app_info_adr
where year='$old_year'
and month='$old_month'
and day='$old_day'
and update_time<='$SEVEN_DAYS_AGO'
and update_time>='$FOURTEEN_DAYS_AGO'
) t
GROUP BY t.package_name, t.platform LIMIT 5000
" | grep -v '^[0-9]\{5,7\}\s\+android' > to_crawler_package_name.txt
if [ $? -ne 0 ];then
exit 255
fi
}
crawl_app_info(){
java -Xms8192m -Xmx8192m -cp ../${JAR} mobvista.dmp.datasource.apptag.crawler.AppInfoCrawler -p \
-f to_crawler_package_name.txt \
-i ios.txt \
-a adr.txt \
-b bundle.txt \
-d "${LOG_TIME}" || return 1
make_dir ${TMP_IOS_APP_INFO_PATH}
hadoop fs -rm ${TMP_IOS_APP_INFO_PATH}/ios.txt
hadoop fs -put ios.txt ${TMP_IOS_APP_INFO_PATH}/
make_dir ${TMP_ADR_APP_INFO_PATH}
hadoop fs -rm ${TMP_ADR_APP_INFO_PATH}/*
hadoop fs -put adr.txt ${TMP_ADR_APP_INFO_PATH}/
make_dir ${TMP_CRAWLER_INFO_PATH}
hadoop fs -rm ${TMP_CRAWLER_INFO_PATH}/*
hadoop fs -put to_crawler_package_name.txt ${TMP_CRAWLER_INFO_PATH}/
#上传bundle数据
if [ -f bundle.txt ];then
hadoop fs -test -e ${TMP_IOS_APP_INFO_PATH}/bundle.txt
if [ $? -eq 0 ];then
hadoop fs -rm ${TMP_IOS_APP_INFO_PATH}/bundle.txt
fi
hadoop fs -put bundle.txt ${TMP_IOS_APP_INFO_PATH}/
fi
}
: '
不改变之前的逻辑,生成的3个文件(ios.txt bundle.txt adr.txt) 上传到s3文件系统
'
: '
crawl_app_info(){
hadoop fs -rmr ${TMP_IOS_APP_INFO_SPARK_PATH}
hadoop fs -rmr ${TMP_ADR_APP_INFO_SPARK_PATH}
hadoop fs -rmr ${TMP_BUNDLE_APP_INFO_SPARK_PATH}
hadoop fs -put -f to_crawler_package_name.txt ${TMP_CRAWLER_INFO_PATH}/
spark-submit --class mobvista.dmp.datasource.apptag.AppInfoCrawlerSpark \
--conf spark.network.timeout=720s \
--conf spark.sql.autoBroadcastJoinThreshold=31457280 \
--master yarn --deploy-mode cluster --name AppInfoCrawlerSpark --executor-memory 2g --driver-memory 2g --executor-cores 2 --num-executors 50 \
../${JAR} \
-input ${TMP_CRAWLER_INFO_PATH} \
-iosoutput ${TMP_IOS_APP_INFO_SPARK_PATH} \
-adroutput ${TMP_ADR_APP_INFO_SPARK_PATH} \
-bundleoutput ${TMP_BUNDLE_APP_INFO_SPARK_PATH} \
-today ${LOG_TIME} -coalesce 100
if [ $? -ne 0 ];then
exit 255
fi
hadoop fs -text ${TMP_IOS_APP_INFO_SPARK_PATH}"/*"> ios.txt
hadoop fs -text ${TMP_ADR_APP_INFO_SPARK_PATH}"/*" > adr.txt
hadoop fs -text ${TMP_BUNDLE_APP_INFO_SPARK_PATH}"/*" > bundle.txt
hadoop fs -put -f ios.txt ${TMP_IOS_APP_INFO_PATH}/
hadoop fs -put -f adr.txt ${TMP_ADR_APP_INFO_PATH}/
#上传bundle数据
if [ -f bundle.txt ];then
hadoop fs -test -e ${TMP_IOS_APP_INFO_PATH}/bundle.txt
if [ $? -eq 0 ];then
hadoop fs -rm ${TMP_IOS_APP_INFO_PATH}/bundle.txt
fi
hadoop fs -put bundle.txt ${TMP_IOS_APP_INFO_PATH}/
fi
}
'
merge_ios() {
local INPUT_NEW="$TMP_IOS_APP_INFO_PATH"
local INPUT_OLD="$APP_INFO_IOS_PATH/$old_year/$old_month/$old_day"
local OUTPUT="$APP_INFO_IOS_PATH/$year/$month/$day"
hadoop jar ../${JAR} mobvista.dmp.datasource.apptag.mapreduce.MergeIosAppInfo \
-Dmapreduce.fileoutputcommitter.algorithm.version=2 \
"$INPUT_NEW" "$INPUT_OLD" "$OUTPUT"
mount_partition dim_app_info_ios "year='$year',month='$month',day='$day'" "$OUTPUT"
}
merge_adr() {
local INPUT_NEW="$TMP_ADR_APP_INFO_PATH"
local INPUT_OLD="$APP_INFO_ADR_PATH/$old_year/$old_month/$old_day"
local OUTPUT="$APP_INFO_ADR_PATH/$year/$month/$day"
hadoop jar ../${JAR} mobvista.dmp.datasource.apptag.mapreduce.MergeAdrAppInfo \
-Dmapreduce.fileoutputcommitter.algorithm.version=2 \
"$INPUT_NEW" "$INPUT_OLD" "$OUTPUT"
mount_partition dim_app_info_adr "year='$year',month='$month',day='$day'" "$OUTPUT"
}
select_pkg_name
if [ $? -ne 0 ];then
exit 255
fi
crawl_app_info
if [ $? -ne 0 ];then
exit 255
fi
merge_ios
if [ $? -ne 0 ];then
exit 255
fi
merge_adr
if [ $? -ne 0 ];then
exit 255
fi
echo "[Crawler App Info End!]"