Commit 5a40c914 by WangJinfeng

fix dmp bug

parent a44b05b6
type=command type=command
dependencies=dmp_install_list_common_reyun,dmp_install_list_common_appsflyer,dmp_install_list_common_other,dmp_install_list_common_uc_activation dependencies=dmp_install_list_common_reyun,dmp_install_list_common_appsflyer,dmp_install_list_common_other
command=sh -x dmp_install_list_daily_v2.sh command=sh -x dmp_install_list_daily_v2.sh
\ No newline at end of file
use emr_doppler; use emr_doppler_v1;
set hive.exec.compress.output=true; set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
...@@ -46,7 +46,7 @@ select game_id, ...@@ -46,7 +46,7 @@ select game_id,
max(install_ad), max(install_keyword)) is not null then true max(install_ad), max(install_keyword)) is not null then true
else false end as is_acquired, else false end as is_acquired,
count(*) as count_event count(*) as count_event
from event_${date_str_undline} from emr_doppler_v1.event_${date_str_undline}
group by game_id, user_id, player_id, to_date(arrival_ts); group by game_id, user_id, player_id, to_date(arrival_ts);
INSERT overwrite table dim_player_error_counts INSERT overwrite table dim_player_error_counts
...@@ -57,7 +57,7 @@ SELECT game_id, ...@@ -57,7 +57,7 @@ SELECT game_id,
SUM(CASE WHEN severity = 'warning' THEN 1 ELSE 0 END) AS count_warning_error, SUM(CASE WHEN severity = 'warning' THEN 1 ELSE 0 END) AS count_warning_error,
SUM(CASE WHEN severity = 'error' THEN 1 ELSE 0 END) AS count_error_error, SUM(CASE WHEN severity = 'error' THEN 1 ELSE 0 END) AS count_error_error,
SUM(CASE WHEN severity = 'critical' THEN 1 ELSE 0 END) AS count_critical_error SUM(CASE WHEN severity = 'critical' THEN 1 ELSE 0 END) AS count_critical_error
FROM event_${date_str_undline} FROM emr_doppler_v1.event_${date_str_undline}
WHERE category = 'error' WHERE category = 'error'
AND severity IS NOT NULL AND severity IS NOT NULL
GROUP BY game_id, player_id; GROUP BY game_id, player_id;
\ No newline at end of file
use emr_doppler; use emr_doppler_v1;
set hive.exec.compress.output=true; set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.map.memory.mb=2048; set mapreduce.map.memory.mb=2048;
...@@ -31,7 +31,7 @@ COALESCE(os_version, v1_os_major) as os, ...@@ -31,7 +31,7 @@ COALESCE(os_version, v1_os_major) as os,
COALESCE(device, v1_device) as device, COALESCE(device, v1_device) as device,
ip, ip,
connection_type connection_type
FROM event_${date_str_undline}; FROM emr_doppler_v1.event_${date_str_undline};
insert overwrite directory 's3://mob-emr-test/dataplatform/emr/event_export/event_export_${date_str_undline}/0/' SELECT * FROM mv_export_${date_str_undline} WHERE `hour` = 0 ; insert overwrite directory 's3://mob-emr-test/dataplatform/emr/event_export/event_export_${date_str_undline}/0/' SELECT * FROM mv_export_${date_str_undline} WHERE `hour` = 0 ;
insert overwrite directory 's3://mob-emr-test/dataplatform/emr/event_export/event_export_${date_str_undline}/1/' SELECT * FROM mv_export_${date_str_undline} WHERE `hour` = 1 ; insert overwrite directory 's3://mob-emr-test/dataplatform/emr/event_export/event_export_${date_str_undline}/1/' SELECT * FROM mv_export_${date_str_undline} WHERE `hour` = 1 ;
......
use emr_doppler; use emr_doppler_v1;
set hive.exec.compress.output=true; set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.map.memory.mb=2048; set mapreduce.map.memory.mb=2048;
...@@ -223,14 +223,14 @@ SELECT game_id, ...@@ -223,14 +223,14 @@ SELECT game_id,
split(event_id, ':')[3] AS f4, split(event_id, ':')[3] AS f4,
split(event_id, ':')[4] AS f5, split(event_id, ':')[4] AS f5,
CASE receipt_status WHEN 'valid' THEN TRUE ELSE FALSE END AS is_valid CASE receipt_status WHEN 'valid' THEN TRUE ELSE FALSE END AS is_valid
FROM event_${date_str_undline} t1 left semi join currency_tmp t2 FROM emr_doppler_v1.event_${date_str_undline} t1 left semi join emr_doppler.currency_tmp t2
on t1.currency = t2.code on t1.currency = t2.code
WHERE category = 'business' AND amount > 0 AND currency IS NOT NULL; WHERE category = 'business' AND amount > 0 AND currency IS NOT NULL;
INSERT overwrite table dim_payment_first_payed INSERT overwrite table dim_payment_first_payed
SELECT X.game_id, x.player_id, x.first_payed SELECT X.game_id, x.player_id, x.first_payed
FROM (SELECT game_id, player_id as player_id, from_unixtime(cast(pay_ft as bigint), 'yyyy-MM-dd') as first_payed FROM (SELECT game_id, player_id as player_id, from_unixtime(cast(pay_ft as bigint), 'yyyy-MM-dd') as first_payed
FROM event_${date_str_undline} FROM emr_doppler_v1.event_${date_str_undline}
WHERE pay_ft IS NOT NULL WHERE pay_ft IS NOT NULL
AND pay_ft != '' AND pay_ft != ''
AND cast(pay_ft as string) != 'undefined' AND cast(pay_ft as string) != 'undefined'
...@@ -239,9 +239,9 @@ group by X.game_id, x.player_id, x.first_payed; ...@@ -239,9 +239,9 @@ group by X.game_id, x.player_id, x.first_payed;
drop table IF EXISTS payment_checkpoint_${date_str_undline}_tmp; drop table IF EXISTS payment_checkpoint_${date_str_undline}_tmp;
use emr_doppler; use emr_doppler_v1;
create table emr_doppler.payment_checkpoint_${date_str_undline}_tmp create table emr_doppler_v1.payment_checkpoint_${date_str_undline}_tmp
location 's3://mob-emr-test/dataplatform/emr/payment_checkpoint_${date_str_undline}_tmp' as location 's3://mob-emr-test/dataplatform/emr/payment_checkpoint_${date_str_undline}_tmp' as
SELECT pcd.game_id, SELECT pcd.game_id,
pcd.checkpoint, pcd.checkpoint,
......
use emr_doppler; use emr_doppler_v1;
set hive.exec.compress.output=true; set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
...@@ -12,14 +12,14 @@ set hive.optimize.skewjoin=true; ...@@ -12,14 +12,14 @@ set hive.optimize.skewjoin=true;
ALTER TABLE player DROP PARTITION (yyyymmdd='${part_pre1_path}'); ALTER TABLE player DROP PARTITION (yyyymmdd='${part_pre1_path}');
-- ALTER TABLE emr_doppler.player SET LOCATION 's3://mob-emr-test/dataplatform/emr/plalyer'; -- ALTER TABLE emr_doppler_v1.player SET LOCATION 's3://mob-emr-test/dataplatform/emr/plalyer';
ALTER TABLE player ADD IF NOT EXISTS PARTITION (yyyymmdd='${part_pre1_path}') LOCATION 's3://mob-emr-test/dataplatform/emr/player/${part_pre1_path}'; ALTER TABLE player ADD IF NOT EXISTS PARTITION (yyyymmdd='${part_pre1_path}') LOCATION 's3://mob-emr-test/dataplatform/emr/player/${part_pre1_path}';
drop table player_tmp_${date_str_undline}; drop table player_tmp_${date_str_undline};
create table player_tmp_${date_str_undline} as select md5(concat(game_id,user_id)) as player_id,game_id,user_id create table player_tmp_${date_str_undline} as select md5(concat(game_id,user_id)) as player_id,game_id,user_id
from event_${date_str_undline} group by player_id,game_id,user_id; from emr_doppler_v1.event_${date_str_undline} group by player_id,game_id,user_id;
insert overwrite table player partition(yyyymmdd='${part_pre1_path}') insert overwrite table player partition(yyyymmdd='${part_pre1_path}')
select x.player_id,x.game_id,user_id from ( select x.player_id,x.game_id,user_id from (
......
use emr_doppler; use emr_doppler_v1;
set hive.exec.compress.output=true; set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
...@@ -164,4 +164,4 @@ DROP TABLE IF EXISTS session_checkpoint_${date_str_undline_old}; ...@@ -164,4 +164,4 @@ DROP TABLE IF EXISTS session_checkpoint_${date_str_undline_old};
DROP table IF EXISTS mv_export_${date_str_undline_old}; DROP table IF EXISTS mv_export_${date_str_undline_old};
drop table IF EXISTS exchange_rate_${date_str_undline_old}; drop table IF EXISTS exchange_rate_${date_str_undline_old};
drop table IF EXISTS payment_checkpoint_${date_str_undline}_tmp; drop table IF EXISTS payment_checkpoint_${date_str_undline}_tmp;
DROP table IF EXISTS event_${date_str_undline_old}; DROP table IF EXISTS emr_doppler_v1.event_${date_str_undline_old};
\ No newline at end of file \ No newline at end of file
use emr_doppler; use emr_doppler_v1;
set hive.exec.compress.output=true; set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
...@@ -11,9 +11,9 @@ set hive.map.aggr = true; ...@@ -11,9 +11,9 @@ set hive.map.aggr = true;
set hive.map.aggr = true; set hive.map.aggr = true;
set hive.optimize.skewjoin=true; set hive.optimize.skewjoin=true;
-- ALTER TABLE emr_doppler.player_checkpoint_daily SET LOCATION 's3://mob-emr-test/dataplatform/emr/player_checkpoint_daily'; -- ALTER TABLE emr_doppler_v1.player_checkpoint_daily SET LOCATION 's3://mob-emr-test/dataplatform/emr/player_checkpoint_daily';
-- ALTER TABLE emr_doppler.player_checkpoint SET LOCATION 's3://mob-emr-test/dataplatform/emr/player_checkpoint'; -- ALTER TABLE emr_doppler_v1.player_checkpoint SET LOCATION 's3://mob-emr-test/dataplatform/emr/player_checkpoint';
ALTER TABLE player_checkpoint_daily ALTER TABLE player_checkpoint_daily
ADD IF NOT EXISTS PARTITION (dt = '${part_pre1_path}') LOCATION 's3://mob-emr-test/dataplatform/emr/${pre1_dateslash}/player_checkpoint'; ADD IF NOT EXISTS PARTITION (dt = '${part_pre1_path}') LOCATION 's3://mob-emr-test/dataplatform/emr/${pre1_dateslash}/player_checkpoint';
...@@ -374,9 +374,9 @@ FROM tmp_daily_export_player_checkpoint_device_stats a ...@@ -374,9 +374,9 @@ FROM tmp_daily_export_player_checkpoint_device_stats a
-- ALTER TABLE player_store_game SET LOCATION 's3://mob-emr-test/dataplatform/emr/store_game'; -- ALTER TABLE player_store_game SET LOCATION 's3://mob-emr-test/dataplatform/emr/store_game';
INSERT OVERWRITE TABLE all_tmp_game_store INSERT OVERWRITE TABLE emr_doppler.all_tmp_game_store
SELECT game_id, store_id SELECT game_id, store_id
FROM player_store_game WHERE store_id IS NOT NULL; FROM emr_doppler.player_store_game WHERE store_id IS NOT NULL;
DROP TABLE player_installation_list_${date_str_undline}; DROP TABLE player_installation_list_${date_str_undline};
CREATE EXTERNAL TABLE `player_installation_list_${date_str_undline}` CREATE EXTERNAL TABLE `player_installation_list_${date_str_undline}`
...@@ -410,7 +410,7 @@ SELECT case ...@@ -410,7 +410,7 @@ SELECT case
when LOWER(platform) = 'ios' then 'https://itunes.apple.com/app/id' || store_id when LOWER(platform) = 'ios' then 'https://itunes.apple.com/app/id' || store_id
end as store_link end as store_link
FROM player_checkpoint pc, FROM player_checkpoint pc,
all_tmp_game_store gs emr_doppler.all_tmp_game_store gs
WHERE not (ios_idfa is null and google_aid is null) WHERE not (ios_idfa is null and google_aid is null)
AND pc.dt = '${part_pre1_path}' AND pc.dt = '${part_pre1_path}'
AND cast(pc.checkpoint as string) = '${part_pre1_dash}' AND cast(pc.checkpoint as string) = '${part_pre1_dash}'
......
use emr_doppler; use emr_doppler_v1;
DROP TABLE event_${date_str_undline}; DROP TABLE emr_doppler_v1.event_${date_str_undline};
create external table IF NOT EXISTS event_${date_str_undline}( create external table IF NOT EXISTS emr_doppler_v1.event_${date_str_undline}(
`version` smallint, `version` smallint,
game_id int, game_id int,
user_id string, user_id string,
......
use emr_doppler; use emr_doppler_v1;
set hive.exec.compress.output=true; set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.map.memory.mb=2048; set mapreduce.map.memory.mb=2048;
...@@ -47,13 +47,13 @@ SELECT game_id, ...@@ -47,13 +47,13 @@ SELECT game_id,
MIN(arrival_ts) AS start_ts, MIN(arrival_ts) AS start_ts,
MAX(arrival_ts) AS end_ts, MAX(arrival_ts) AS end_ts,
MAX(session_num % 2147483647) AS num MAX(session_num % 2147483647) AS num
FROM event_${date_str_undline} FROM emr_doppler_v1.event_${date_str_undline}
WHERE session_id IS NOT NULL WHERE session_id IS NOT NULL
GROUP BY game_id, player_id, session_id; GROUP BY game_id, player_id, session_id;
set mapred.reduce.tasks=14; set mapred.reduce.tasks=14;
-- ALTER TABLE emr_doppler.dim_player_session SET LOCATION 's3://mob-emr-test/dataplatform/emr/dim_player_session'; -- ALTER TABLE emr_doppler_v1.dim_player_session SET LOCATION 's3://mob-emr-test/dataplatform/emr/dim_player_session';
insert OVERWRITE table dim_player_session insert OVERWRITE table dim_player_session
SELECT game_id, SELECT game_id,
...@@ -89,7 +89,7 @@ SELECT game_id, ...@@ -89,7 +89,7 @@ SELECT game_id,
split(event_id, ':')[2] AS f2, split(event_id, ':')[2] AS f2,
split(event_id, ':')[3] AS f3, split(event_id, ':')[3] AS f3,
split(event_id, ':')[4] AS f4 split(event_id, ':')[4] AS f4
FROM event_${date_str_undline} FROM emr_doppler_v1.event_${date_str_undline}
WHERE category = 'progression' WHERE category = 'progression'
AND score >= -2147483648 AND score >= -2147483648
GROUP BY game_id, player_id, event_id; GROUP BY game_id, player_id, event_id;
use emr_doppler; use emr_doppler_v1;
set hive.exec.compress.output=true; set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.map.memory.mb=2048; set mapreduce.map.memory.mb=2048;
...@@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS revenue_${date_str_undline} ...@@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS revenue_${date_str_undline}
INSERT OVERWRITE table revenue_${date_str_undline} INSERT OVERWRITE table revenue_${date_str_undline}
SELECT game_id, user_id, player_id, MAX(revenue) SELECT game_id, user_id, player_id, MAX(revenue)
FROM event_${date_str_undline} FROM emr_doppler_v1.event_${date_str_undline}
WHERE revenue like '%AED%' WHERE revenue like '%AED%'
or revenue like '%AFN%' or revenue like '%AFN%'
or revenue like '%ALL%' or revenue like '%ALL%'
......
...@@ -26,24 +26,23 @@ sleep 30 ...@@ -26,24 +26,23 @@ sleep 30
output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${date_path}" output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${date_path}"
unmount_output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${unmount_date_path}" unmount_output_path="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_user_info/${unmount_date_path}"
export SPARK_HOME="/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2" # export SPARK_HOME="/data/hadoop-home/engineplus-k8s-spark-3.0.0-hadoop3.2"
export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf" # export SPARK_CONF_DIR="/data/hadoop-config/command-home/engineplus-k8s-spark-3.0.0-online/conf"
spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \ spark-submit --class mobvista.dmp.datasource.retargeting.DeviceInfoJob \
--name "DeviceInfoJob.wangjf.${date}" \ --name "DeviceInfoJob.wangjf.${date}" \
--conf spark.sql.broadcastTimeout=1200 \ --conf spark.sql.shuffle.partitions=6000 \
--conf spark.sql.shuffle.partitions=5000 \ --conf spark.default.parallelism=6000 \
--conf spark.default.parallelism=5000 \
--conf spark.kryoserializer.buffer.max=512m \ --conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryoserializer.buffer=64m \ --conf spark.kryoserializer.buffer=64m \
--conf spark.driver.executor.memoryOverhead=4096 \
--conf spark.sql.files.maxPartitionBytes=536870912 \ --conf spark.sql.files.maxPartitionBytes=536870912 \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.sql.adaptive.enabled=true \ --conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \ --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 6g --executor-cores 4 --num-executors 100 \ --master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 10g --executor-cores 4 --num-executors 180 \
../${JAR} \ ../${JAR} \
-date ${date} -output ${output_path} -coalesce 2000 -date ${date} -output ${output_path} -coalesce 3000
if [[ $? -ne 0 ]]; then if [[ $? -ne 0 ]]; then
exit 255 exit 255
......
type=command type=command
dependencies=rtdmp_request_v2,rtdmp_request_tencent,rtdmp_request_other,rtdmp_request_dsp,rtdmp_request_btop,rtdmp_request_uc,rtdmp_request_youku_acquisition,rtdmp_request_alive dependencies=rtdmp_request_v2,rtdmp_request_tencent,rtdmp_request_other,rtdmp_request_dsp,rtdmp_request_btop,rtdmp_request_youku_acquisition,rtdmp_request_alive
command=echo "RTDmp Request Success !!!" command=echo "RTDmp Request Success !!!"
\ No newline at end of file
...@@ -123,10 +123,10 @@ spark-submit --class mobvista.dmp.datasource.device.OdsDmpUserInfoDailyV2 \ ...@@ -123,10 +123,10 @@ spark-submit --class mobvista.dmp.datasource.device.OdsDmpUserInfoDailyV2 \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \ --conf spark.sql.autoBroadcastJoinThreshold=-1 \
--master yarn \ --master yarn \
--deploy-mode cluster \ --deploy-mode cluster \
--executor-memory 12G \ --executor-memory 10G \
--driver-memory 6G \ --driver-memory 6G \
--executor-cores 5 \ --executor-cores 3 \
--num-executors 100 \ --num-executors 256 \
../${JAR} \ ../${JAR} \
-cur_day ${date} -gender_date ${GET_GENDER_DATE} -output ${OUTPUT_PATH} -coalesce 2000 -cur_day ${date} -gender_date ${GET_GENDER_DATE} -output ${OUTPUT_PATH} -coalesce 2000
......
...@@ -130,6 +130,16 @@ CREATE TABLE dwh.audience_merge_v1 (`dt` Date, `hour` FixedString(2), `devid` St ...@@ -130,6 +130,16 @@ CREATE TABLE dwh.audience_merge_v1 (`dt` Date, `hour` FixedString(2), `devid` St
CREATE TABLE dwh.audience_merge_v1_all (`dt` Date,`hour` FixedString(2),`devid` String,`audience_id` Array(Int32),`device_type` String) ENGINE = Distributed('cluster_1st', 'dwh', 'audience_merge_v1', rand()); CREATE TABLE dwh.audience_merge_v1_all (`dt` Date,`hour` FixedString(2),`devid` String,`audience_id` Array(Int32),`device_type` String) ENGINE = Distributed('cluster_1st', 'dwh', 'audience_merge_v1', rand());
CREATE TABLE dwh.etl_baichuan_daily ON CLUSTER cluster_1st (`dt` Date, `device_id` String, `app_id` Int32, `app_os` Int32, `tag` Int32, `update_date` String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/etl_baichuan_daily', '{replica}') PARTITION BY toYYYYMMDD(dt) ORDER BY (dt, device_id, app_id, app_os, update_date) TTL dt + toIntervalWeek(1) SETTINGS index_granularity = 8192,use_minimalistic_part_header_in_zookeeper = 1;
DROP TABLE dwh.etl_iqiyi_daily ON CLUSTER cluster_1st;
CREATE TABLE dwh.etl_iqiyi_daily ON CLUSTER cluster_1st (`dt` Date, `device_id` String, `device_type` String, `platform` String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/etl_iqiyi_daily', '{replica}') PARTITION BY toYYYYMMDD(dt) ORDER BY (dt, device_id, device_type, platform) TTL dt + toIntervalWeek(1) SETTINGS index_granularity = 8192,use_minimalistic_part_header_in_zookeeper = 1;
DROP TABLE dmp.youku_laxin_daily ON CLUSTER cluster_1st;
CREATE TABLE dmp.youku_laxin_daily ON CLUSTER cluster_1st (`dt` Date, `device_type` String, `device_ids` String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/youku_laxin_daily', '{replica}') PARTITION BY (toYYYYMMDD(dt), device_type) ORDER BY (dt, device_type, device_ids) TTL dt + toIntervalWeek(1) SETTINGS index_granularity = 8192,use_minimalistic_part_header_in_zookeeper = 1;
CREATE TABLE dwh.audience_merge (dt Date,hour FixedString(2),devid String,audience_id Array(Int32)) ENGINE = MergeTree() PARTITION BY (toYYYYMMDD(dt),hour) ORDER BY (dt, hour, devid) SETTINGS index_granularity = 8192; CREATE TABLE dwh.audience_merge (dt Date,hour FixedString(2),devid String,audience_id Array(Int32)) ENGINE = MergeTree() PARTITION BY (toYYYYMMDD(dt),hour) ORDER BY (dt, hour, devid) SETTINGS index_granularity = 8192;
CREATE TABLE dmp.uc_lahuo_daily ON CLUSTER cluster_1st(dt Date,device_type String,device_ids String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/uc_lahuo_daily', '{replica}') PARTITION BY (toYYYYMMDD(dt),device_type) ORDER BY (dt,device_type,device_ids) TTL dt + toIntervalWeek(1) SETTINGS index_granularity = 8192; CREATE TABLE dmp.uc_lahuo_daily ON CLUSTER cluster_1st(dt Date,device_type String,device_ids String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/uc_lahuo_daily', '{replica}') PARTITION BY (toYYYYMMDD(dt),device_type) ORDER BY (dt,device_type,device_ids) TTL dt + toIntervalWeek(1) SETTINGS index_granularity = 8192;
......
package mobvista.dmp.datasource.event_tag package mobvista.dmp.datasource.event_tag
import java.net.URI
import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options import org.apache.commons.cli.Options
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.commons.lang.StringUtils import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import scala.collection.mutable.ArrayBuffer import java.net.URI
class DmpGaEventTag extends CommonSparkJob with Serializable { class DmpGaEventTag extends CommonSparkJob with Serializable {
...@@ -60,13 +56,13 @@ class DmpGaEventTag extends CommonSparkJob with Serializable { ...@@ -60,13 +56,13 @@ class DmpGaEventTag extends CommonSparkJob with Serializable {
|case when t2.package_name is not null then t2.second_tag else 'other' end as second_tag, |case when t2.package_name is not null then t2.second_tag else 'other' end as second_tag,
|install_ts install_time, |install_ts install_time,
|'$today' update_date |'$today' update_date
| from emr_doppler.event_$today_underline t1 left join (select a.game_id,a.store_id package_name,b.first_tag,b.second_tag from emr_doppler.player_new_store_game a join dwh.dim_package_tags_combine b on(a.store_id = b.package_name) group by a.game_id,a.store_id ,b.first_tag,b.second_tag ) t2 | from emr_doppler_v1.event_$today_underline t1 left join (select a.game_id,a.store_id package_name,b.first_tag,b.second_tag from emr_doppler_v1.player_new_store_game a join dwh.dim_package_tags_combine b on(a.store_id = b.package_name) group by a.game_id,a.store_id ,b.first_tag,b.second_tag ) t2
| on(t1.game_id = t2.game_id) | on(t1.game_id = t2.game_id)
| where lower(t1.platform) in('android','ios') and event_id is not null and event_id != '' | where lower(t1.platform) in('android','ios') and event_id is not null and event_id != ''
|and ((ios_idfa rlike '$didPtn' and ios_idfa != '$allZero') or (google_aid rlike '$didPtn' and google_aid != '$allZero') or android_id rlike '$andriodIdPtn' ) |and ((ios_idfa rlike '$didPtn' and ios_idfa != '$allZero') or (google_aid rlike '$didPtn' and google_aid != '$allZero') or android_id rlike '$andriodIdPtn' )
""".stripMargin*/ """.stripMargin*/
// emr_doppler.player_new_store_game -> player_store_game 由于之前的没有上传 // emr_doppler_v1.player_new_store_game -> player_store_game 由于之前的没有上传
val sql = val sql =
s""" s"""
|select /*+ mapjoin(t2)*/ ios_idfa, |select /*+ mapjoin(t2)*/ ios_idfa,
...@@ -78,7 +74,7 @@ class DmpGaEventTag extends CommonSparkJob with Serializable { ...@@ -78,7 +74,7 @@ class DmpGaEventTag extends CommonSparkJob with Serializable {
|regexp_replace(revenue,'\\\\{|\\\\}','') event_value, |regexp_replace(revenue,'\\\\{|\\\\}','') event_value,
|case when t2.package_name is not null then t2.second_tag else 'other' end as second_tag, |case when t2.package_name is not null then t2.second_tag else 'other' end as second_tag,
|'$today' update_date |'$today' update_date
| from emr_doppler.event_$today_underline t1 left join (select a.game_id,a.store_id package_name,b.first_tag,b.second_tag from emr_doppler.player_store_game a join dwh.dim_package_tags_combine b on(a.store_id = b.package_name) group by a.game_id,a.store_id ,b.first_tag,b.second_tag ) t2 | from emr_doppler_v1.event_$today_underline t1 left join (select a.game_id,a.store_id package_name,b.first_tag,b.second_tag from emr_doppler.player_store_game a join dwh.dim_package_tags_combine b on(a.store_id = b.package_name) group by a.game_id,a.store_id ,b.first_tag,b.second_tag ) t2
| on(t1.game_id = t2.game_id) | on(t1.game_id = t2.game_id)
| where lower(t1.platform) in('android','ios') | where lower(t1.platform) in('android','ios')
|and ((ios_idfa rlike '$didPtn' and ios_idfa != '$allZero') or (google_aid rlike '$didPtn' and google_aid != '$allZero') or android_id rlike '$andriodIdPtn' ) |and ((ios_idfa rlike '$didPtn' and ios_idfa != '$allZero') or (google_aid rlike '$didPtn' and google_aid != '$allZero') or android_id rlike '$andriodIdPtn' )
......
...@@ -2,21 +2,17 @@ package mobvista.dmp.datasource.retargeting ...@@ -2,21 +2,17 @@ package mobvista.dmp.datasource.retargeting
// import com.datastax.spark.connector._ // import com.datastax.spark.connector._
import java.net.URI
import java.util
import com.alibaba.fastjson.{JSON, JSONObject} import com.alibaba.fastjson.{JSON, JSONObject}
import mobvista.dmp.clickhouse.feature.FrequencyEntity
import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil import mobvista.dmp.util.DateUtil
import mobvista.prd.datasource.util.GsonUtil import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI
import java.util
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
...@@ -42,8 +38,8 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -42,8 +38,8 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
val sdf2 = new SimpleDateFormat("yyyyMMdd") val sdf2 = new SimpleDateFormat("yyyyMMdd")
var bMap: Broadcast[scala.collection.Map[String, String]] = null var bMap: scala.collection.Map[String, String] = new mutable.HashMap[String, String]()
var packageMap: Broadcast[scala.collection.Map[String, Int]] = null var packageMap: scala.collection.Map[String, Int] = new mutable.HashMap[String, Int]()
override protected def run(args: Array[String]): Int = { override protected def run(args: Array[String]): Int = {
val parser = new BasicParser() val parser = new BasicParser()
...@@ -60,7 +56,6 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -60,7 +56,6 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
.config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.orc.filterPushdown", "true")
.config("spark.io.compression.codec", "lz4") .config("spark.io.compression.codec", "lz4")
.config("spark.io.compression.lz4.blockSize", "64k") .config("spark.io.compression.lz4.blockSize", "64k")
.config("spark.sql.autoBroadcastJoinThreshold", "209715200")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport() .enableHiveSupport()
...@@ -74,17 +69,15 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -74,17 +69,15 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val sc = spark.sparkContext val sc = spark.sparkContext
val code_sql = Constant.old2new_sql val code_sql = Constant.old2new_sql
bMap = sc.broadcast(spark.sql(code_sql).rdd.map(r => { bMap = spark.sql(code_sql).rdd.cache()
.map(r => {
(r.getAs("tag_code").toString, r.getAs("new_second_id").toString) (r.getAs("tag_code").toString, r.getAs("new_second_id").toString)
}).collectAsMap()) }).collectAsMap()
println("bMap.size ===>>> " + bMap.value.size)
val map = sc.broadcast(spark.sql(Constant.second2first_sql).rdd.map(r => { val map = spark.sql(Constant.second2first_sql).rdd.cache()
.map(r => {
(r.getAs("new_second_id").toString, r.getAs("new_first_id").toString) (r.getAs("new_second_id").toString, r.getAs("new_first_id").toString)
}).collectAsMap()) }).collectAsMap()
println("map.size ===>>> " + map.value.size)
var package_sql = var package_sql =
""" """
...@@ -96,9 +89,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -96,9 +89,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
s""" s"""
|SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}' |SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}'
""".stripMargin """.stripMargin
packageMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => { packageMap = spark.sql(package_sql).rdd.cache()
.map(r => {
(r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString)) (r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString))
}).collectAsMap()) }).collectAsMap()
/* /*
packageMap = sc.broadcast(Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd.map(r => { packageMap = sc.broadcast(Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd.map(r => {
...@@ -149,7 +143,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -149,7 +143,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
new JSONObject() new JSONObject()
} }
freObject.keySet().foreach(key => { freObject.keySet().foreach(key => {
interest.add(map.value(key)) interest.add(map(key))
interest.add(key) interest.add(key)
}) })
/* /*
...@@ -167,7 +161,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -167,7 +161,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
*/ */
val interestArr = r.getAs("interest").asInstanceOf[mutable.WrappedArray[String]] val interestArr = r.getAs("interest").asInstanceOf[mutable.WrappedArray[String]]
interestArr.foreach(i => { interestArr.foreach(i => {
interest.add(map.value(i)) interest.add(map(i))
interest.add(i) interest.add(i)
}) })
...@@ -183,7 +177,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -183,7 +177,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val count = j.get("count").getAsInt val count = j.get("count").getAsInt
cntJson.put("count", count) cntJson.put("count", count)
tag_week_jsonObject.put(tag_id, cntJson) tag_week_jsonObject.put(tag_id, cntJson)
interest.add(map.value(tag_id)) interest.add(map(tag_id))
interest.add(tag_id) interest.add(tag_id)
} }
} }
...@@ -199,7 +193,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -199,7 +193,7 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val count = j.get("count").getAsInt val count = j.get("count").getAsInt
cntJson.put("count", count) cntJson.put("count", count)
tag_month_jsonObject.put(tag_id, cntJson) tag_month_jsonObject.put(tag_id, cntJson)
interest.add(map.value(tag_id)) interest.add(map(tag_id))
interest.add(tag_id) interest.add(tag_id)
} }
} }
...@@ -230,10 +224,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -230,10 +224,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
val ins = inters.toUpperCase.split(",") val ins = inters.toUpperCase.split(",")
if (ins.length >= 3) { if (ins.length >= 3) {
val key = ins(0) + "-" + ins(1) + "-" + ins(2) val key = ins(0) + "-" + ins(1) + "-" + ins(2)
val vals = if (bMap.value.keySet.contains(key)) { val vals = if (bMap.keySet.contains(key)) {
bMap.value(key) bMap(key)
} else { } else {
bMap.value.getOrElse(key + "OTHER", "") bMap.getOrElse(key + "OTHER", "")
} }
if (StringUtils.isNotBlank(vals)) { if (StringUtils.isNotBlank(vals)) {
set.add(vals) set.add(vals)
...@@ -244,10 +238,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -244,10 +238,10 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
} }
def getId(tag_code: String): String = { def getId(tag_code: String): String = {
val id = if (bMap.value.keySet.contains(tag_code.toUpperCase)) { val id = if (bMap.keySet.contains(tag_code.toUpperCase)) {
bMap.value(tag_code.toUpperCase) bMap(tag_code.toUpperCase)
} else { } else {
bMap.value.getOrElse(tag_code.toUpperCase + "OTHER", "") bMap.getOrElse(tag_code.toUpperCase + "OTHER", "")
} }
id id
} }
...@@ -258,9 +252,9 @@ class DeviceInfoJob extends CommonSparkJob with Serializable { ...@@ -258,9 +252,9 @@ class DeviceInfoJob extends CommonSparkJob with Serializable {
if (StringUtils.isNotBlank(install)) { if (StringUtils.isNotBlank(install)) {
install.split(",").foreach(pkgs => { install.split(",").foreach(pkgs => {
val pkd = pkgs.split("\\|") val pkd = pkgs.split("\\|")
if (pkd.nonEmpty && StringUtils.isNotBlank(pkd(0)) && packageMap.value.contains(pkd(0).toLowerCase) if (pkd.nonEmpty && StringUtils.isNotBlank(pkd(0)) && packageMap.contains(pkd(0).toLowerCase)
) { ) {
set.add(packageMap.value(pkd(0).toLowerCase)) set.add(packageMap(pkd(0).toLowerCase))
} }
}) })
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment