DspImpressionHourFull.scala 9.83 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
package mobvista.dmp.datasource.dsp


import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.SparkSession

class DspImpressionHourFull extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val starttime = commandLine.getOptionValue("starttime")
    val endtime = commandLine.getOptionValue("endtime")
    val output = commandLine.getOptionValue("output")


    val spark = SparkSession
      .builder()
      .appName("OdsDmpUserInfoAll_job")
      .config("spark.rdd.compress", "true")
      .config("spark.speculation", "false")
      .config("spark.speculation.quantile", "0.9")
      .config("spark.speculation.multiplier", "1")
     // .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    try {
//
      spark.sql("set  hive.exec.dynamic.partition.mode=nonstrict")
      spark.sql("set  hive.exec.dynamic.partition=true")
      spark.sql("set hive.exec.compress.output=true")
      spark.sql("set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec")

      var  hql =
        s"""
           |select   impression.time,
           |case when impression.xforwardip     in ('-','')  then request.xforwardip  else impression.xforwardip   end as xforwardip,
           |case when impression.ip             in ('-','')  then request.ip  else impression.ip   end as ip,
           |case when impression.exchanges      in ('-','')  then request.exchanges  else impression.exchanges   end as exchanges,
           |case when impression.elapsed        in ('-','')  then request.elapsed  else impression.elapsed   end as elapsed,
           |case when impression.url            in ('-','')  then request.url  else impression.url   end as url,
           |case when impression.body           in ('-','')  then request.body  else impression.body   end as body,
           |case when request.requestid is not null then  request.requestid else  impression.requestid  end as requestid,
           |case when impression.bid            in ('-','')  then request.bid  else impression.bid   end as bid,
           |impression.price,
           |case when impression.`describe`       in ('-','')  then request.`describe`  else impression.`describe`   end as `describe`,
           |case when impression.ext1           in ('-','')  then request.ext1  else impression.ext1   end as ext1,
           |case when impression.ext2           in ('-','')  then request.ext2  else impression.ext2   end as ext2,
           |case when impression.ext3           in ('-','')  then request.ext3  else impression.ext3   end as ext3,
           |case when impression.ext4           in ('-','')  then request.ext4  else impression.ext4   end as ext4,
           |case when impression.ext5           in ('-','')  then request.ext5  else impression.ext5   end as ext5,
           |case when impression.auctiontype    in ('-','')  then request.auctiontype  else impression.auctiontype   end as auctiontype ,
           |impression.bidreqid,
           |case when impression.impid          in ('-','')  then request.impid  else impression.impid   end as impid,
           |case when impression.publisherid    in ('-','')  then request.publisherid  else impression.publisherid   end as publisherid,
           |case when impression.appid          in ('-','')  then request.appid  else impression.appid   end as appid,
           |case when impression.appname        in ('-','')  then request.appname  else impression.appname   end as appname,
           |case when impression.posid          in ('-','')  then request.posid  else impression.posid   end as posid,
           |case when impression.category       in ('-','')  then request.category  else impression.category   end as category,
           |case when impression.intl           in ('-','')  then request.intl  else impression.intl   end as intl,
           |case when impression.imagesize      in ('-','')  then request.imagesize  else impression.imagesize   end as imagesize,
           |case when impression.deviceip       in ('-','')  then request.deviceip  else impression.deviceip   end as deviceip,
           |case when impression.make           in ('-','')  then request.make  else impression.make   end as make,
           |case when impression.model          in ('-','')  then request.model  else impression.model   end as model,
           |case when impression.os             in ('-','')  then request.os  else impression.os   end as os,
           |case when impression.osv            in ('-','')  then request.osv  else impression.osv   end as osv,
           |case when impression.devicetype     in ('-','')  then request.devicetype  else impression.devicetype   end as devicetype,
           |case when impression.cncttype       in ('-','')  then request.cncttype  else impression.cncttype   end as cncttype,
           |case when impression.countrycode    in ('-','')  then request.countrycode  else impression.countrycode   end as countrycode,
           |case when impression.googleadid     in ('-','')  then request.googleadid  else impression.googleadid   end as googleadid,
           |case when impression.imeisha1       in ('-','')  then request.imeisha1  else impression.imeisha1   end as imeisha1,
           |case when impression.androididmd5   in ('-','')  then request.androididmd5  else impression.androididmd5   end as androididmd5,
           |case when impression.idfa           in ('-','')  then request.idfa  else impression.idfa   end as idfa,
           |case when impression.keywords       in ('-','')  then request.keywords  else impression.keywords   end as keywords,
           |case when impression.yob            in ('-','')  then request.yob  else impression.yob   end as yob,
           |case when impression.gender         in ('-','')  then request.gender  else impression.gender   end as gender,
           |case when impression.ext6           in ('-','')  then request.ext6  else impression.ext6   end as ext6,
           |case when impression.ext7           in ('-','')  then request.ext7  else impression.ext7   end as ext7,
           |case when impression.ext8           in ('-','')  then request.ext8  else impression.ext8   end as ext8,
           |case when impression.ext9           in ('-','')  then request.ext9  else impression.ext9   end as ext9,
           |case when impression.ext10          in ('-','')  then request.ext10  else impression.ext10   end as ext10,
           |case when impression.campaignid     in ('-','')  then request.campaignid  else impression.campaignid   end as campaignid,
           |case when impression.cinstallprice  in ('-','')  then request.cinstallprice  else impression.cinstallprice   end as cinstallprice,
           |case when impression.cappname       in ('-','')  then request.cappname  else impression.cappname   end as cappname,
           |case when impression.cpackagename   in ('-','')  then request.cpackagename  else impression.cpackagename   end as cpackagename,
           |case when impression.cadvertiserid  in ('-','')  then request.cadvertiserid  else impression.cadvertiserid   end as cadvertiserid,
           |case when impression.ccreativeid    in ('-','')  then request.ccreativeid  else impression.ccreativeid   end as ccreativeid,
           |impression.yr,
           |impression.mt,
           |impression.dt,
           |impression.rg,
           |impression.hh
           |from ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg,hh,case when exchanges='tencent' then concat(bidreqid,exchanges) else requestid end as join_key
           |      from adn_dsp.log_adn_dsp_impression_org_hour where  rg = 'cn' and concat(yr,mt,dt,hh) = '${endtime}' ) impression
           |left join
           |( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg,hh,case when exchanges='tencent' then concat(bidreqid,exchanges) else requestid end as join_key
           |      from adn_dsp.log_adn_dsp_request_orc_hour where  rg = 'cn' and concat(yr,mt,dt,hh) >= '${starttime}' and  concat(yr,mt,dt,hh) <= '${endtime}' ) request
           |on (impression.join_key = request.join_key)
        """.stripMargin
      spark.sql(hql)
        .rdd
        .map(_.mkString(DATA_SPLIT))
        .saveAsTextFile(output, classOf[GzipCodec])

    } finally {

      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  def commandOptions(): Options ={
    val options = new Options()
    options.addOption("starttime", true, "starttime Time")
    options.addOption("endtime", true, "endtime Time")
    options.addOption("output", true, "output dir")
    options
  }

}

object DspImpressionHourFull {
  def main(args: Array[String]): Unit = {
    new DspImpressionHourFull().run(args)
  }
}