CnGoodChannel.scala 19.6 KB
package mobvista.dmp.datasource.dm

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.RDDMultipleOutputFormat
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel

import java.net.URI
import scala.collection.mutable.ArrayBuffer

/**
 * @author jiangfan
 * @date 2021/9/9 17:42
 */
class CnGoodChannel extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("old_data_path", true, "[must] old_data_path")
    options.addOption("output1", true, "[must] output1")
    options.addOption("output2", true, "[must] output2")
    options.addOption("output3", true, "[must] output3")
    options.addOption("output4", true, "[must] output4")
    options.addOption("output5", true, "[must] output5")
    options.addOption("package_name1", true, "[must] package_name1")
    options.addOption("package_name2", true, "[must] package_name2")
    options.addOption("package_name3", true, "[must] package_name3")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val old_data_path = commandLine.getOptionValue("old_data_path")
    val output1 = commandLine.getOptionValue("output1")
    val output2 = commandLine.getOptionValue("output2")
    val output3 = commandLine.getOptionValue("output3")
    val output4 = commandLine.getOptionValue("output4")
    val output5 = commandLine.getOptionValue("output5")
    val package_name1 = commandLine.getOptionValue("package_name1")
    val package_name2 = commandLine.getOptionValue("package_name2")
    val package_name3 = commandLine.getOptionValue("package_name3")

    val spark = SparkSession.builder()
      .appName("CnGoodChannel")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    val conf = spark.sparkContext.hadoopConfiguration
    conf.set("mapreduce.output.compress", "true")
    conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
    conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
    conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
    conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output2), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output3), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output4), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output5), true)

    try {
      val old_data: RDD[Row] = sc.textFile(old_data_path).map(row => {
        val package_name=row.split("\t", -1)(3)
        Row(row.split("\t", -1)(0), row.split("\t", -1)(1), package_name.substring(2,package_name.lastIndexOf("\"")))
      })

      val schema: StructType = StructType(Array(
        StructField("device_id", StringType),
        StructField("device_type", StringType),
        StructField("package_name", StringType)
      ))

      val old_data_df = spark.createDataFrame(old_data, schema)
      old_data_df.createOrReplaceTempView("yesterday_data")

//      val sql1=
//        s"""
//           |select device_id,device_type
//           |from
//           |  (select  split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
//           |  from
//           |  (select onedata
//           |  from
//           |  (
//           |  select concat_ws(",",v1,v2,v3,v4) data
//           |  from
//           |      (
//           |       select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
//           |       from
//           |          (select
//           |             ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
//           |             ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
//           |             md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
//           |             md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
//           |           FROM
//           |             dwh.ods_adn_trackingnew_request
//           |           where country_code = 'CN' and platform = 'android' and publisher_id in('11386','20284','16227','23652','18721','20781','22193','21522','21915') and app_id not in ('122317')
//           |             and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
//           |           )  tmpdata where imei is not null and oaid is not null and imeimd5 is not null and oaidmd5 is not null
//           |       )  t
//           |  )  tt
//           |  lateral view explode(split(data, ",")) num  as onedata)   ttt
//           |  union
//           |  select device_id,device_type
//           |  from
//           |      yesterday_data
//           |  where  package_name='${package_name1}'  )  tttt
//           |  group by device_id,device_type
//           |""".stripMargin


      val sql1=
        s"""
           |select device_id,device_type
           |from
           |  (select  split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
           |  from
           |  (select onedata
           |  from
           |  (
           |      select data
           |      from
           |          (
           |           select concat_ws(",",v1,v2,v3,v4) data
           |           from
           |               (
           |                select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
           |                from
           |                   (select
           |                      ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
           |                      ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
           |                      md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
           |                      md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
           |                    FROM
           |                      dwh.ods_adn_trackingnew_request
           |                    where country_code = 'CN' and platform = 'android' and publisher_id in('11386','20284','16227','23652','18721','20781','22193','21522','21915') and app_id not in ('122317')
           |                      and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
           |                    )  tmpdata
           |                )  t01
           |           )  t02  where data!='' ) t03
           |  lateral view explode(split(data, ",")) num  as onedata)   t04
           |  union
           |  select device_id,device_type
           |  from
           |      yesterday_data
           |  where  package_name='${package_name1}'  )  t05
           |  group by device_id,device_type
           |""".stripMargin


      val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER)

      spark.sql(sql1).rdd.coalesce(coalesce.toInt).map(r => {
        val device_id = r.getAs[String]("device_id")
        val device_type = r.getAs[String]("device_type")
        (new Text(s"${output1}/${device_type}"), new Text(device_id))
      }).saveAsNewAPIHadoopFile(output1, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)

//      val sql2=
//        s"""
//           |select device_id,device_type
//           |from
//           |  (select  split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
//           |  from
//           |  (select onedata
//           |  from
//           |  (
//           |  select concat_ws(",",v1,v2,v3,v4) data
//           |  from
//           |     (
//           |      select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
//           |      from
//           |            (select
//           |              ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
//           |              ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
//           |              md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
//           |              md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
//           |            FROM
//           |              dwh.ods_adn_trackingnew_request
//           |            where  country_code = 'CN' and platform = 'android' and publisher_id in('18590','21933','13566','19672','26101','21386')
//           |              and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
//           |            )  tmpdata where imei is not null and oaid is not null and imeimd5 is not null and oaidmd5 is not null
//           |      )  t
//           |  )  tt
//           |  lateral view explode(split(data, ",")) num  as onedata)   ttt
//           |  union
//           |  select device_id,device_type
//           |  from
//           |      yesterday_data
//           |  where  package_name='${package_name2}'  )  tttt
//           |  group by device_id,device_type
//           |""".stripMargin

      val sql2=
        s"""
           |select device_id,device_type
           |from
           |  (select  split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
           |  from
           |  (select onedata
           |  from
           |  (
           |      select data
           |      from
           |           (
           |           select concat_ws(",",v1,v2,v3,v4) data
           |           from
           |              (
           |               select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
           |               from
           |                     (select
           |                        ( case when imei not in ('0','00000000-0000-0000-0000-000000000000','',' ','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imei,
           |                        ( case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid,
           |                        md5(case when imei not in ('0','00000000-0000-0000-0000-000000000000','','00000000','00000000000000','000000000000000','0000000000000000') then imei else null end) as imeimd5,
           |                        md5(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaidmd5
           |                     FROM
           |                       dwh.ods_adn_trackingnew_request
           |                     where  country_code = 'CN' and platform = 'android' and publisher_id in('18590','21933','13566','19672','26101','21386')
           |                       and concat(yyyy,'-',mm,'-',dd)=cast(date_sub(CURRENT_DATE, 1) as string)
           |                     )  tmpdata
           |               )  t01
           |           )  t02  where data!='' ) t03
           |  lateral view explode(split(data, ",")) num  as onedata)   t04
           |  union
           |  select device_id,device_type
           |  from
           |      yesterday_data
           |  where  package_name='${package_name2}'  )  t05
           |  group by device_id,device_type
           |""".stripMargin

      val df02: DataFrame = spark.sql(sql2).persist(StorageLevel.MEMORY_AND_DISK_SER)

      spark.sql(sql2).rdd.coalesce(coalesce.toInt).map(r => {
        val device_id = r.getAs[String]("device_id")
        val device_type = r.getAs[String]("device_type")
        (new Text(s"${output2}/${device_type}"), new Text(device_id))
      }).saveAsNewAPIHadoopFile(output2, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)


//  第三个sql需求原本是从adn_dsp.log_adn_dsp_request_orc_hour取数据,但这个表计算量大,发现dwh.etl_dsp_request_daily_hours 保存了adn_dsp.log_adn_dsp_request_orc_hour相关数据,读取该表,减小计算量
//
      val sql3=
        s"""
           |select device_id,device_type
           |from
           |  (select  split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
           |  from
           |  (select onedata
           |  from
           |  (
           |  select concat_ws(",",v1,v2,v3,v4) data
           |  from
           |      (
           |        select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
           |        from
           |        (select
           |                 split(exitid,',')[4]  as  imei,
           |                 split(exitid,',')[12] as  oaid,
           |                 split(exitid,',')[5]  as  imeimd5,
           |                 split(exitid,',')[13] as  oaidmd5
           |              from
           |              (select
           |                   packagename,exitid
           |               FROM
           |                   dwh.etl_dsp_request_daily_hours
           |               where   dt=replace(cast(date_sub(CURRENT_DATE, 1) as string),'-','')
           |                     and country = 'CN' and platform = 'android' and exchanges in ('iqiyi','wax')
           |               ) tmpdata lateral view explode(split(packagename, "#")) num  as appid
           |               where appid in ('com.qiyi.video','com.sina.weibo','tv.pps.mobile')
           |         )  t01
           |       )  t02
           |     )  t03
           |    lateral view explode(split(data, ",")) num  as onedata where data!='' and split(onedata,"\\\\|")[0]!='')   t04
           |  union
           |  select device_id,device_type
           |  from
           |      yesterday_data
           |  where  package_name='${package_name3}'  )  t05
           |  group by device_id,device_type
           |""".stripMargin

//      val sql3=
//        s"""
//           |select device_id,device_type
//           |from
//           |  (select  split(onedata,"\\\\|")[0] as device_id,split(onedata,"\\\\|")[1] as device_type
//           |  from
//           |  (select onedata
//           |  from
//           |  (
//           |  select concat_ws(",",v1,v2,v3,v4) data
//           |  from
//           |      (
//           |        select concat(imei,"|imei") v1 ,concat(oaid,"|oaid") v2,concat(imeimd5,"|imeimd5") v3 ,concat(oaidmd5,"|oaidmd5") v4
//           |        from
//           |              (select
//           |                split(ext5,',')[4] as  imei,
//           |                split(ext5,',')[12] as  oaid,
//           |                split(ext5,',')[5] as  imeimd5,
//           |                split(ext5,',')[13] as  oaidmd5
//           |               FROM
//           |                 adn_dsp.log_adn_dsp_request_orc_hour
//           |               where countrycode = 'CN' and os = 'android' and exchanges in ('iqiyi','wax') and appid in ('com.qiyi.video','com.sina.weibo','tv.pps.mobile')
//           |                 and concat(yr,'-',mt,'-',dt)=cast(date_sub(CURRENT_DATE, 1) as string)
//           |              )  tmpdata where imei is not null and oaid is not null and imeimd5 is not null and oaidmd5 is not null
//           |        )  t
//           |     )  tt
//           |  lateral view explode(split(data, ",")) num  as onedata)   ttt
//           |  union
//           |  select device_id,device_type
//           |  from
//           |      yesterday_data
//           |  where  package_name='${package_name3}'  )  tttt
//           |  group by device_id,device_type
//           |""".stripMargin

      val df03: DataFrame = spark.sql(sql3).persist(StorageLevel.MEMORY_AND_DISK_SER)

      spark.sql(sql3).rdd.coalesce(coalesce.toInt).map(r => {
        val device_id = r.getAs[String]("device_id")
        val device_type = r.getAs[String]("device_type")
        (new Text(s"${output3}/${device_type}"), new Text(device_id))
      }).saveAsNewAPIHadoopFile(output3, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)

      val data01 = df01.select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("android"),lit("[\""+package_name1+"\"]")))
      val data01_with_country = df01.select(concat_ws("\t", df01.col("device_id"),  df01.col("device_type"),  lit("android"),lit("CN")))

      val data02 = df02.select(concat_ws("\t", df02.col("device_id"),  df02.col("device_type"),  lit("android"),lit("[\""+package_name2+"\"]")))
      val data02_with_country = df02.select(concat_ws("\t", df02.col("device_id"),  df02.col("device_type"),  lit("android"),lit("CN")))

      val data03 = df03.select(concat_ws("\t", df03.col("device_id"),  df03.col("device_type"),  lit("android"),lit("[\""+package_name3+"\"]")))
      val data03_with_country = df03.select(concat_ws("\t", df03.col("device_id"),  df03.col("device_type"),  lit("android"),lit("CN")))

      data01.union(data02).union(data03).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output4)
      data01_with_country.union(data02_with_country).union(data03_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output5)


    } finally {
      spark.stop()
    }
    0
  }

}

object CnGoodChannel {
  def main(args: Array[String]): Unit = {
     new CnGoodChannel().run(args)
  }
}