Commit e9ba0fb2 by fan.jiang

cn_good_channel

parent cdccac2d
type=command
command=sh -x cn_good_channel.sh
#!/usr/bin/env bash
source ../../dmp_env.sh
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
dt_slash_yesterday=$(date -d "$ScheduleTime 2 days ago" +"%Y/%m/%d")
check_await "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}"
hadoop fs -test -e "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
if [ $? -ne 0 ];then
hadoop fs -mkdir -p "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}"
fi
PACKAGE_NAME01="sdk_goodreq7"
PACKAGE_NAME02="sdk_wangzhuan_goodreq7"
PACKAGE_NAME03="adx_goodreq7"
OLD_DATA_PATH="${RTDMP_TMP_PACKAGE_NAME_PATH}/cn_good_channel_to_other/${dt_slash_yesterday}/01"
OUTPUT_PATH1="${RTDMP_TMP_PACKAGE_NAME_PATH}/cn_good_channel/${dt_slash_today}/${PACKAGE_NAME01}"
OUTPUT_PATH2="${RTDMP_TMP_PACKAGE_NAME_PATH}/cn_good_channel/${dt_slash_today}/${PACKAGE_NAME02}"
OUTPUT_PATH3="${RTDMP_TMP_PACKAGE_NAME_PATH}/cn_good_channel/${dt_slash_today}/${PACKAGE_NAME03}"
OUTPUT_PATH4="${RTDMP_TMP_PACKAGE_NAME_PATH}/cn_good_channel_to_other/${dt_slash_today}/01"
OUTPUT_PATH5="${RTDMP_TMP_PACKAGE_NAME_PATH}/cn_good_channel_to_other/${dt_slash_today}/02"
hadoop fs -rm -r "${OUTPUT_PATH1}"
hadoop fs -rm -r "${OUTPUT_PATH2}"
hadoop fs -rm -r "${OUTPUT_PATH3}"
hadoop fs -rm -r "${OUTPUT_PATH4}"
hadoop fs -rm -r "${OUTPUT_PATH5}"
spark-submit --class mobvista.dmp.datasource.dm.CnGoodChannel \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=3000 \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 100 \
../../${JAR} -output1 ${OUTPUT_PATH1} -output2 ${OUTPUT_PATH2} -output3 ${OUTPUT_PATH3} \
-output4 ${OUTPUT_PATH4} -output5 ${OUTPUT_PATH5} -old_data_path ${OLD_DATA_PATH} \
-package_name1 ${PACKAGE_NAME01} -package_name2 ${PACKAGE_NAME02} -package_name3 ${PACKAGE_NAME03} \
-coalesce 600
if [[ $? -ne 0 ]]; then
exit 255
fi
hadoop distcp -m20 "${OUTPUT_PATH4}/*" "${TMP_EGGPLANTS_OUTPUT_PATH}/${dt_slash_today}/"
hadoop distcp -m20 "${OUTPUT_PATH5}/*" "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_today}/"
: '
按照需求,从CnGoodChannel.scala文件里的三个sql语句分别抽取不同的数据,放到三个伪包名对应的s3路径中,由于计算量较大,sql里面取得都是一天的数据
然后和前一天的结果数据去重后,把今天的结果存储在output1 output2 output3 三个路径,同步大媒体使用
并且把数据写到output4 output5 路径,然后入安装列表business="other"分区
'
\ No newline at end of file
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.RDDMultipleOutputFormat
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
import scala.collection.mutable.ArrayBuffer
/**
* @author jiangfan
* @date 2021/9/9 17:42
*/
class CnGoodChannel extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("old_data_path", true, "[must] old_data_path")
options.addOption("output1", true, "[must] output1")
options.addOption("output2", true, "[must] output2")
options.addOption("output3", true, "[must] output3")
options.addOption("output4", true, "[must] output4")
options.addOption("output5", true, "[must] output5")
options.addOption("package_name1", true, "[must] package_name1")
options.addOption("package_name2", true, "[must] package_name2")
options.addOption("package_name3", true, "[must] package_name3")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val old_data_path = commandLine.getOptionValue("old_data_path")
val output1 = commandLine.getOptionValue("output1")
val output2 = commandLine.getOptionValue("output2")
val output3 = commandLine.getOptionValue("output3")
val output4 = commandLine.getOptionValue("output4")
val output5 = commandLine.getOptionValue("output5")
val package_name1 = commandLine.getOptionValue("package_name1")
val package_name2 = commandLine.getOptionValue("package_name2")
val package_name3 = commandLine.getOptionValue("package_name3")
val spark = SparkSession.builder()
.appName("CnGoodChannel")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val conf = spark.sparkContext.hadoopConfiguration
conf.set("mapreduce.output.compress", "true")
conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output2), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output3), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output4), true)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output5), true)
try {
val old_data: RDD[Row] = sc.textFile(old_data_path).map(row => {
val package_name=row.split("\t", -1)(3)
Row(row.split("\t", -1)(0), row.split("\t", -1)(1), package_name.substring(2,package_name.lastIndexOf("\"")))
})
val schema: StructType = StructType(Array(
StructField("device_id", StringType),
StructField("device_type", StringType),
StructField("package_name", StringType)
))
val old_data_df = spark.createDataFrame(old_data, schema)
old_data_df.createOrReplaceTempView("yesterday_data")
// val sql1=
// s"""
// |select device_id,device_type
// |from
// | (select split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
// | from
// | (select onedata
// | from
// | (
// | select concat_ws(",",v1,v2,v3,v4) data
// | from
// | (
// | select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
// | from
// | (select
// | ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
// | ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
// | md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
// | md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
// | FROM
// | dwh.ods_adn_trackingnew_request
// | where country_code = 'CN' and platform = 'android' and publisher_id in('11386','20284','16227','23652','18721','20781','22193','21522','21915') and app_id not in ('122317')
// | and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
// | ) tmpdata where imei is not null and oaid is not null and imeimd5 is not null and oaidmd5 is not null
// | ) t
// | ) tt
// | lateral view explode(split(data, ",")) num as onedata) ttt
// | union
// | select device_id,device_type
// | from
// | yesterday_data
// | where package_name='${package_name1}' ) tttt
// | group by device_id,device_type
// |""".stripMargin
val sql1=
s"""
|select device_id,device_type
|from
| (select split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
| from
| (select onedata
| from
| (
| select data
| from
| (
| select concat_ws(",",v1,v2,v3,v4) data
| from
| (
| select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
| from
| (select
| ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
| ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
| md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
| md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
| FROM
| dwh.ods_adn_trackingnew_request
| where country_code = 'CN' and platform = 'android' and publisher_id in('11386','20284','16227','23652','18721','20781','22193','21522','21915') and app_id not in ('122317')
| and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
| ) tmpdata
| ) t01
| ) t02 where data!='' ) t03
| lateral view explode(split(data, ",")) num as onedata) t04
| union
| select device_id,device_type
| from
| yesterday_data
| where package_name='${package_name1}' ) t05
| group by device_id,device_type
|""".stripMargin
val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)
spark.sql(sql1).rdd.coalesce(coalesce.toInt).map(r => {
val device_id = r.getAs[String]("device_id")
val device_type = r.getAs[String]("device_type")
(new Text(s"${output1}/${device_type}"), new Text(device_id))
}).saveAsNewAPIHadoopFile(output1, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
// val sql2=
// s"""
// |select device_id,device_type
// |from
// | (select split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
// | from
// | (select onedata
// | from
// | (
// | select concat_ws(",",v1,v2,v3,v4) data
// | from
// | (
// | select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
// | from
// | (select
// | ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
// | ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
// | md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
// | md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
// | FROM
// | dwh.ods_adn_trackingnew_request
// | where country_code = 'CN' and platform = 'android' and publisher_id in('18590','21933','13566','19672','26101','21386')
// | and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
// | ) tmpdata where imei is not null and oaid is not null and imeimd5 is not null and oaidmd5 is not null
// | ) t
// | ) tt
// | lateral view explode(split(data, ",")) num as onedata) ttt
// | union
// | select device_id,device_type
// | from
// | yesterday_data
// | where package_name='${package_name2}' ) tttt
// | group by device_id,device_type
// |""".stripMargin
val sql2=
s"""
|select device_id,device_type
|from
| (select split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
| from
| (select onedata
| from
| (
| select data
| from
| (
| select concat_ws(",",v1,v2,v3,v4) data
| from
| (
| select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
| from
| (select
| ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
| ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
| md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
| md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
| FROM
| dwh.ods_adn_trackingnew_request
| where country_code = 'CN' and platform = 'android' and publisher_id in('18590','21933','13566','19672','26101','21386')
| and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
| ) tmpdata
| ) t01
| ) t02 where data!='' ) t03
| lateral view explode(split(data, ",")) num as onedata) t04
| union
| select device_id,device_type
| from
| yesterday_data
| where package_name='${package_name2}' ) t05
| group by device_id,device_type
|""".stripMargin
val df02: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)
spark.sql(sql2).rdd.coalesce(coalesce.toInt).map(r => {
val device_id = r.getAs[String]("device_id")
val device_type = r.getAs[String]("device_type")
(new Text(s"${output2}/${device_type}"), new Text(device_id))
}).saveAsNewAPIHadoopFile(output2, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
// 第三个sql需求原本是从adn_dsp.log_adn_dsp_request_orc_hour取数据,但这个表计算量大,发现dwh.etl_dsp_request_daily_hours 保存了adn_dsp.log_adn_dsp_request_orc_hour相关数据,读取该表,减小计算量
//
val sql3=
s"""
|select device_id,device_type
|from
| (select split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
| from
| (select onedata
| from
| (
| select concat_ws(",",v1,v2,v3,v4) data
| from
| (
| select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
| from
| (select
| split(exitid,',')[4] as imei,
| split(exitid,',')[12] as oaid,
| split(exitid,',')[5] as imeimd5,
| split(exitid,',')[13] as oaidmd5
| from
| (select
| packagename,exitid
| FROM
| dwh.etl_dsp_request_daily_hours
| where dt=replace(cast(date_sub(CURRENT_DATE, 1) as string),'-','')
| and country = 'CN' and platform = 'android' and exchanges in ('iqiyi','wax')
| ) tmpdata lateral view explode(split(packagename, "#")) num as appid
| where appid in ('com.qiyi.video','com.sina.weibo','tv.pps.mobile')
| ) t01
| ) t02
| ) t03
| lateral view explode(split(data, ",")) num as onedata where data!='' and split(onedata,"\\\\|")[0]!='') t04
| union
| select device_id,device_type
| from
| yesterday_data
| where package_name='${package_name3}' ) t05
| group by device_id,device_type
|""".stripMargin
// val sql3=
// s"""
// |select device_id,device_type
// |from
// | (select split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
// | from
// | (select onedata
// | from
// | (
// | select concat_ws(",",v1,v2,v3,v4) data
// | from
// | (
// | select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
// | from
// | (select
// | split(ext5,',')[4] as imei,
// | split(ext5,',')[12] as oaid,
// | split(ext5,',')[5] as imeimd5,
// | split(ext5,',')[13] as oaidmd5
// | FROM
// | adn_dsp.log_adn_dsp_request_orc_hour
// | where countrycode = 'CN' and os = 'android' and exchanges in ('iqiyi','wax') and appid in ('com.qiyi.video','com.sina.weibo','tv.pps.mobile')
// | and concat(yr,'-',mt,'-',dt)=cast(date_sub(CURRENT_DATE, 1) as string)
// | ) tmpdata where imei is not null and oaid is not null and imeimd5 is not null and oaidmd5 is not null
// | ) t
// | ) tt
// | lateral view explode(split(data, ",")) num as onedata) ttt
// | union
// | select device_id,device_type
// | from
// | yesterday_data
// | where package_name='${package_name3}' ) tttt
// | group by device_id,device_type
// |""".stripMargin
val df03: DataFrame = spark.sql(sql3).persist(StorageLevel.MEMORY_AND_DISK_SER)
spark.sql(sql3).rdd.coalesce(coalesce.toInt).map(r => {
val device_id = r.getAs[String]("device_id")
val device_type = r.getAs[String]("device_type")
(new Text(s"${output3}/${device_type}"), new Text(device_id))
}).saveAsNewAPIHadoopFile(output3, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
val data01 = df01.select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),lit("[\""+package_name1+"\"]")))
val data01_with_country = df01.select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),lit("CN")))
val data02 = df02.select(concat_ws("\t", df02.col("device_id"), df02.col("device_type"), lit("android"),lit("[\""+package_name2+"\"]")))
val data02_with_country = df02.select(concat_ws("\t", df02.col("device_id"), df02.col("device_type"), lit("android"),lit("CN")))
val data03 = df03.select(concat_ws("\t", df03.col("device_id"), df03.col("device_type"), lit("android"),lit("[\""+package_name3+"\"]")))
val data03_with_country = df03.select(concat_ws("\t", df03.col("device_id"), df03.col("device_type"), lit("android"),lit("CN")))
data01.union(data02).union(data03).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output4)
data01_with_country.union(data02_with_country).union(data03_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output5)
} finally {
spark.stop()
}
0
}
}
object CnGoodChannel {
def main(args: Array[String]): Unit = {
new CnGoodChannel().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment