Commit 4b2c5073 by fan.jiang

fix cn_good_channel

parent 23ca0443
......@@ -55,4 +55,7 @@ hadoop distcp -m20 "${OUTPUT_PATH5}/*" "${ODS_OTHER_DEVICE_DAILY}/${dt_slash_to
按照需求,从CnGoodChannel.scala文件里的三个sql语句分别抽取不同的数据,放到三个伪包名对应的s3路径中,由于计算量较大,sql里面取得都是一天的数据
然后和前一天的结果数据去重后,把今天的结果存储在output1 output2 output3 三个路径,同步大媒体使用
并且把数据写到output4 output5 路径,然后入安装列表business="other"分区
2021-09-17 开发需求更新
1、output1 output2 output3 三个路径名称中imeimd5要改成imei_md5,oaidmd5要改成oaid_md5
2、imei用md5加密,产出到 imei_md5,然后去重;oaid同理。
'
\ No newline at end of file
......@@ -95,41 +95,6 @@ class CnGoodChannel extends CommonSparkJob with Serializable {
val old_data_df = spark.createDataFrame(old_data, schema)
old_data_df.createOrReplaceTempView("yesterday_data")
// val sql1=
// s"""
// |select device_id,device_type
// |from
// | (select split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
// | from
// | (select onedata
// | from
// | (
// | select concat_ws(",",v1,v2,v3,v4) data
// | from
// | (
// | select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
// | from
// | (select
// | ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
// | ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
// | md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
// | md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
// | FROM
// | dwh.ods_adn_trackingnew_request
// | where country_code = 'CN' and platform = 'android' and publisher_id in('11386','20284','16227','23652','18721','20781','22193','21522','21915') and app_id not in ('122317')
// | and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
// | ) tmpdata where imei is not null and oaid is not null and imeimd5 is not null and oaidmd5 is not null
// | ) t
// | ) tt
// | lateral view explode(split(data, ",")) num as onedata) ttt
// | union
// | select device_id,device_type
// | from
// | yesterday_data
// | where package_name='${package_name1}' ) tttt
// | group by device_id,device_type
// |""".stripMargin
val sql1=
s"""
......@@ -171,47 +136,26 @@ class CnGoodChannel extends CommonSparkJob with Serializable {
val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)
df01.createOrReplaceTempView("result_data01")
spark.sql(sql1).rdd.coalesce(coalesce.toInt).map(r => {
val sql11=
s"""
|select device_id,device_type
|from
|(select
| ( case when device_type in ('imei','oaid') then md5(device_id) else device_id end) as device_id,
| ( case when device_type in ('imei','oaid') then concat(device_type,"_md5") when device_type='imeimd5' then "imei_md5" when device_type='oaidmd5' then "oaid_md5" end) as device_type
|from
|result_data01) t
|group by device_id,device_type
|""".stripMargin
spark.sql(sql11).rdd.coalesce(coalesce.toInt).map(r => {
val device_id = r.getAs[String]("device_id")
val device_type = r.getAs[String]("device_type")
(new Text(s"${output1}/${device_type}"), new Text(device_id))
}).saveAsNewAPIHadoopFile(output1, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
// val sql2=
// s"""
// |select device_id,device_type
// |from
// | (select split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
// | from
// | (select onedata
// | from
// | (
// | select concat_ws(",",v1,v2,v3,v4) data
// | from
// | (
// | select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
// | from
// | (select
// | ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
// | ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
// | md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
// | md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
// | FROM
// | dwh.ods_adn_trackingnew_request
// | where country_code = 'CN' and platform = 'android' and publisher_id in('18590','21933','13566','19672','26101','21386')
// | and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
// | ) tmpdata where imei is not null and oaid is not null and imeimd5 is not null and oaidmd5 is not null
// | ) t
// | ) tt
// | lateral view explode(split(data, ",")) num as onedata) ttt
// | union
// | select device_id,device_type
// | from
// | yesterday_data
// | where package_name='${package_name2}' ) tttt
// | group by device_id,device_type
// |""".stripMargin
val sql2=
s"""
......@@ -252,8 +196,21 @@ class CnGoodChannel extends CommonSparkJob with Serializable {
|""".stripMargin
val df02: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)
df02.createOrReplaceTempView("result_data02")
val sql22=
s"""
|select device_id,device_type
|from
|(select
| ( case when device_type in ('imei','oaid') then md5(device_id) else device_id end) as device_id,
| ( case when device_type in ('imei','oaid') then concat(device_type,"_md5") when device_type='imeimd5' then "imei_md5" when device_type='oaidmd5' then "oaid_md5" end) as device_type
|from
|result_data02) t
|group by device_id,device_type
|""".stripMargin
spark.sql(sql2).rdd.coalesce(coalesce.toInt).map(r => {
spark.sql(sql22).rdd.coalesce(coalesce.toInt).map(r => {
val device_id = r.getAs[String]("device_id")
val device_type = r.getAs[String]("device_type")
(new Text(s"${output2}/${device_type}"), new Text(device_id))
......@@ -338,8 +295,21 @@ class CnGoodChannel extends CommonSparkJob with Serializable {
// |""".stripMargin
val df03: DataFrame = spark.sql(sql3).persist(StorageLevel.MEMORY_AND_DISK_SER)
df03.createOrReplaceTempView("result_data03")
val sql33=
s"""
|select device_id,device_type
|from
|(select
| ( case when device_type in ('imei','oaid') then md5(device_id) else device_id end) as device_id,
| ( case when device_type in ('imei','oaid') then concat(device_type,"_md5") when device_type='imeimd5' then "imei_md5" when device_type='oaidmd5' then "oaid_md5" end) as device_type
|from
|result_data03) t
|group by device_id,device_type
|""".stripMargin
spark.sql(sql3).rdd.coalesce(coalesce.toInt).map(r => {
spark.sql(sql33).rdd.coalesce(coalesce.toInt).map(r => {
val device_id = r.getAs[String]("device_id")
val device_type = r.getAs[String]("device_type")
(new Text(s"${output3}/${device_type}"), new Text(device_id))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment