#!/bin/bash

# # # # # # # # # # # # # # # # # # # # # # 
# @file  : crawl_app_info.sh
# @author: houying
# @date  : 16-11-3
# @desc  : 抓取package信息,并合并到相关的信息表里
# # # # # # # # # # # # # # # # # # # # # #

source ../dmp_env.sh

LOG_TIME=$(date -d "$ScheduleTime 1 days ago" "+%Y%m%d")

year=${LOG_TIME:0:4}
month=${LOG_TIME:4:2}
day=${LOG_TIME:6:2}

YESTERDAY=$(date -d "$ScheduleTime 2 days ago" "+%Y%m%d")
old_year=${YESTERDAY:0:4}
old_month=${YESTERDAY:4:2}
old_day=${YESTERDAY:6:2}

PACKAGE_PATH="${PACKAGE_TMP_PATH}/${year}/${month}/${day}"


select_pkg_name() {
CREATE_TABLE_SQL="
drop table if exists tmp_package_name;
create external table if not exists tmp_package_name (
    package_name string,
    platform string
) ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '$PACKAGE_PATH'
"

# 上传到hdfs,计算与现有app_info的差集和信息过期的app_info
SEVEN_DAYS_AGO="$(date -d "$ScheduleTime 7 days ago" "+%Y%m%d")"
FOURTEEN_DAYS_AGO="$(date -d "$ScheduleTime 14 days ago" "+%Y%m%d")"
hive_cmd "
use dwh;
set hive.cli.print.header=false;
$CREATE_TABLE_SQL;
select t.package_name, t.platform
from (
select a.package_name, a.platform
from tmp_package_name a
  left outer join (
    select package_name, 'ios' as platform
    from dim_app_info_ios
    where year='$old_year'
          and month='$old_month'
          and day='$old_day'
    union all
    select package_name, 'adr' as platform
    from dim_app_info_adr
    where year='$old_year'
          and month='$old_month'
          and day='$old_day'
  ) b
    on a.platform=b.platform
       and a.package_name=b.package_name
where b.platform is null and b.package_name is null
union all
select package_name, 'ios' as platform
from dim_app_info_ios
where year='$old_year'
      and month='$old_month'
      and day='$old_day'
      and update_time<='$SEVEN_DAYS_AGO'
      and update_time>='$FOURTEEN_DAYS_AGO'
union all
select package_name, 'adr' as platform
from dim_app_info_adr
where year='$old_year'
      and month='$old_month'
      and day='$old_day'
      and update_time<='$SEVEN_DAYS_AGO'
      and update_time>='$FOURTEEN_DAYS_AGO'
) t
GROUP BY t.package_name, t.platform LIMIT 5000
" | grep -v '^[0-9]\{5,7\}\s\+android' > to_crawler_package_name.txt

if [ $? -ne 0 ];then
  exit 255
fi
}


crawl_app_info(){
java -Xms8192m -Xmx8192m -cp ../${JAR} mobvista.dmp.datasource.apptag.crawler.AppInfoCrawler -p \
    -f to_crawler_package_name.txt \
    -i ios.txt \
    -a adr.txt  \
    -b bundle.txt \
    -d "${LOG_TIME}" || return 1
make_dir ${TMP_IOS_APP_INFO_PATH}
hadoop fs -rm ${TMP_IOS_APP_INFO_PATH}/ios.txt
hadoop fs -put ios.txt ${TMP_IOS_APP_INFO_PATH}/

make_dir ${TMP_ADR_APP_INFO_PATH}
hadoop fs -rm ${TMP_ADR_APP_INFO_PATH}/*
hadoop fs -put adr.txt ${TMP_ADR_APP_INFO_PATH}/

make_dir ${TMP_CRAWLER_INFO_PATH}
hadoop fs -rm ${TMP_CRAWLER_INFO_PATH}/*
hadoop fs -put to_crawler_package_name.txt ${TMP_CRAWLER_INFO_PATH}/

#上传bundle数据
if [ -f bundle.txt ];then
  hadoop fs -test -e ${TMP_IOS_APP_INFO_PATH}/bundle.txt
  if [ $? -eq 0 ];then
    hadoop fs -rm ${TMP_IOS_APP_INFO_PATH}/bundle.txt
  fi
  hadoop fs -put bundle.txt ${TMP_IOS_APP_INFO_PATH}/
fi
}


: '
不改变之前的逻辑,生成的3个文件(ios.txt  bundle.txt  adr.txt) 上传到s3文件系统
'
: '
crawl_app_info(){
hadoop fs -rmr ${TMP_IOS_APP_INFO_SPARK_PATH}
hadoop fs -rmr ${TMP_ADR_APP_INFO_SPARK_PATH}
hadoop fs -rmr ${TMP_BUNDLE_APP_INFO_SPARK_PATH}
hadoop fs -put -f to_crawler_package_name.txt ${TMP_CRAWLER_INFO_PATH}/

spark-submit --class mobvista.dmp.datasource.apptag.AppInfoCrawlerSpark \
  --conf spark.network.timeout=720s \
  --conf spark.sql.autoBroadcastJoinThreshold=31457280 \
  --deploy-mode cluster --name AppInfoCrawlerSpark --executor-memory 2g --driver-memory 2g  --executor-cores 2 --num-executors 50 \
  ../${JAR}  \
  -input  ${TMP_CRAWLER_INFO_PATH} \
  -iosoutput ${TMP_IOS_APP_INFO_SPARK_PATH} \
  -adroutput ${TMP_ADR_APP_INFO_SPARK_PATH} \
  -bundleoutput ${TMP_BUNDLE_APP_INFO_SPARK_PATH} \
  -today ${LOG_TIME} -coalesce 100

if [ $? -ne 0 ];then
  exit 255
fi

hadoop fs -text  ${TMP_IOS_APP_INFO_SPARK_PATH}"/*">  ios.txt
hadoop fs -text  ${TMP_ADR_APP_INFO_SPARK_PATH}"/*" > adr.txt
hadoop fs -text  ${TMP_BUNDLE_APP_INFO_SPARK_PATH}"/*" > bundle.txt

hadoop fs -put -f ios.txt ${TMP_IOS_APP_INFO_PATH}/
hadoop fs -put -f  adr.txt ${TMP_ADR_APP_INFO_PATH}/

#上传bundle数据
if [ -f bundle.txt ];then
  hadoop fs -test -e ${TMP_IOS_APP_INFO_PATH}/bundle.txt
  if [ $? -eq 0 ];then
    hadoop fs -rm ${TMP_IOS_APP_INFO_PATH}/bundle.txt
  fi
  hadoop fs -put bundle.txt ${TMP_IOS_APP_INFO_PATH}/
fi
}
'

merge_ios() {
  local INPUT_NEW="$TMP_IOS_APP_INFO_PATH"
  local INPUT_OLD="$APP_INFO_IOS_PATH/$old_year/$old_month/$old_day"
  local OUTPUT="$APP_INFO_IOS_PATH/$year/$month/$day"
  hadoop jar ../${JAR} mobvista.dmp.datasource.apptag.mapreduce.MergeIosAppInfo \
    -Dmapreduce.fileoutputcommitter.algorithm.version=2 \
    "$INPUT_NEW" "$INPUT_OLD" "$OUTPUT"
  mount_partition dim_app_info_ios "year='$year',month='$month',day='$day'" "$OUTPUT"
}

merge_adr() {
  local INPUT_NEW="$TMP_ADR_APP_INFO_PATH"
  local INPUT_OLD="$APP_INFO_ADR_PATH/$old_year/$old_month/$old_day"
  local OUTPUT="$APP_INFO_ADR_PATH/$year/$month/$day"
  hadoop jar ../${JAR} mobvista.dmp.datasource.apptag.mapreduce.MergeAdrAppInfo \
    -Dmapreduce.fileoutputcommitter.algorithm.version=2 \
    "$INPUT_NEW" "$INPUT_OLD" "$OUTPUT"
  mount_partition dim_app_info_adr "year='$year',month='$month',day='$day'" "$OUTPUT"
}


select_pkg_name
if [ $? -ne 0 ];then
  exit 255
fi
crawl_app_info
if [ $? -ne 0 ];then
  exit 255
fi

merge_ios
if [ $? -ne 0 ];then
  exit 255
fi

merge_adr
if [ $? -ne 0 ];then
  exit 255
fi

echo "[Crawler App Info End!]"