DspImpressionHourFullCn.scala 12 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
package mobvista.dmp.datasource.dsp

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.SparkSession

class DspImpressionHourFullCn extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val starttime = commandLine.getOptionValue("starttime")
    val endtime = commandLine.getOptionValue("endtime")
    val output = commandLine.getOptionValue("output")


    val spark = SparkSession
      .builder()
      .config("spark.rdd.compress", "true")
      .config("spark.speculation", "false")
      .config("spark.speculation.quantile", "0.9")
      .config("spark.speculation.multiplier", "1")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    try {
//
      spark.sql("set  hive.exec.dynamic.partition.mode=nonstrict")
      spark.sql("set  hive.exec.dynamic.partition=true")
      spark.sql("set hive.exec.compress.output=true")
      spark.sql("set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec")

      var  hql =
        s"""
        select   impression.time,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.xforwardip  else impression.xforwardip   end as xforwardip,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ip  else impression.ip   end as ip,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.exchanges  else impression.exchanges   end as exchanges,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.elapsed  else impression.elapsed   end as elapsed,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.url  else impression.url   end as url,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.body  else impression.body   end as body,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then  request.requestid else  impression.requestid  end as requestid,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.bid  else impression.bid   end as bid,
           | impression.price,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.`describe`  else impression.`describe`   end as `describe`,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext1  else impression.ext1   end as ext1,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext2  else impression.ext2   end as ext2,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext3  else impression.ext3   end as ext3,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext4  else impression.ext4   end as ext4,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext5  else impression.ext5   end as ext5,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.auctiontype  else impression.auctiontype   end as auctiontype ,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.bidreqid  else impression.bidreqid   end as bidreqid ,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.impid  else impression.impid   end as impid,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.publisherid  else impression.publisherid   end as publisherid,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.appid  else impression.appid   end as appid,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.appname  else impression.appname   end as appname,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.posid  else impression.posid   end as posid,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.category  else impression.category   end as category,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.intl  else impression.intl   end as intl,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.imagesize  else impression.imagesize   end as imagesize,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.deviceip  else impression.deviceip   end as deviceip,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.make  else impression.make   end as make,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.model  else impression.model   end as model,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.os  else impression.os   end as os,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.osv  else impression.osv   end as osv,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.devicetype  else impression.devicetype   end as devicetype,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cncttype  else impression.cncttype   end as cncttype,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.countrycode  else impression.countrycode   end as countrycode,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.googleadid  else impression.googleadid   end as googleadid,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.imeisha1  else impression.imeisha1   end as imeisha1,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.androididmd5  else impression.androididmd5   end as androididmd5,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.idfa  else impression.idfa   end as idfa,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.keywords  else impression.keywords   end as keywords,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.yob  else impression.yob   end as yob,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.gender  else impression.gender   end as gender,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext6  else impression.ext6   end as ext6,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext7  else impression.ext7   end as ext7,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext8  else impression.ext8   end as ext8,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext9  else impression.ext9   end as ext9,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext10  else impression.ext10   end as ext10,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.campaignid  else impression.campaignid   end as campaignid,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cinstallprice  else impression.cinstallprice   end as cinstallprice,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cappname  else impression.cappname   end as cappname,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cpackagename  else impression.cpackagename   end as cpackagename,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cadvertiserid  else impression.cadvertiserid   end as cadvertiserid,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ccreativeid  else impression.ccreativeid   end as ccreativeid,
           | case when request.requestid is not null and impression.exchanges != 'nexage'  then 1  when request.requestid is null and impression.exchanges != 'nexage' then  0 end
           | from ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg
           | from adn_dsp.log_adn_dsp_impression_org_hour  where  rg = 'cn' and concat(yr,mt,dt,hh) = '${endtime}'
           |  group by time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg
           |  ) impression
           | left join
           | ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg
           | from adn_dsp.log_adn_dsp_bid_request_orc_hour where  rg = 'cn' and concat(yr,mt,dt,hh) >= '${starttime}' and  concat(yr,mt,dt,hh) <= '${endtime}'
           | group  by time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg
           | ) request
           | on (impression.requestid = request.requestid)
            """.stripMargin
      spark.sql(hql)
        .rdd
        .map(_.mkString(DATA_SPLIT))
        .saveAsTextFile(output, classOf[GzipCodec])

    } finally {

      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  def commandOptions(): Options ={
    val options = new Options()
    options.addOption("starttime", true, "starttime Time")
    options.addOption("endtime", true, "endtime Time")
    options.addOption("output", true, "output dir")
    options
  }

}

object DspImpressionHourFullCn {
  def main(args: Array[String]): Unit = {
    new DspImpressionHourFullCn().run(args)
  }
}