package mobvista.dmp.datasource.dsp import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.SparkSession class DspImpressionHourFullOther extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val starttime = commandLine.getOptionValue("starttime") val endtime = commandLine.getOptionValue("endtime") val region = commandLine.getOptionValue("region") val output = commandLine.getOptionValue("output") val spark = SparkSession .builder() .config("spark.rdd.compress", "true") .config("spark.speculation", "false") .config("spark.speculation.quantile", "0.9") .config("spark.speculation.multiplier", "1") // .config("spark.io.compression.codec", "snappy") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) try { // spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") spark.sql("set hive.exec.dynamic.partition=true") spark.sql("set hive.exec.compress.output=true") spark.sql("set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec") var hql = s""" | select impression.time, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.xforwardip else impression.xforwardip end as xforwardip, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ip else impression.ip end as ip, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.exchanges else impression.exchanges end as exchanges, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.elapsed else impression.elapsed end as elapsed, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.url else impression.url end as url, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.body else impression.body end as body, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.requestid else impression.requestid end as requestid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.bid else impression.bid end as bid, | impression.price, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.`describe` else impression.`describe` end as `describe`, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext1 else impression.ext1 end as ext1, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext2 else impression.ext2 end as ext2, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext3 else impression.ext3 end as ext3, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext4 else impression.ext4 end as ext4, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext5 else impression.ext5 end as ext5, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.auctiontype else impression.auctiontype end as auctiontype , | case when request.requestid is not null and impression.exchanges != 'nexage' then request.bidreqid else impression.bidreqid end as bidreqid , | case when request.requestid is not null and impression.exchanges != 'nexage' then request.impid else impression.impid end as impid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.publisherid else impression.publisherid end as publisherid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.appid else impression.appid end as appid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.appname else impression.appname end as appname, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.posid else impression.posid end as posid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.category else impression.category end as category, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.intl else impression.intl end as intl, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.imagesize else impression.imagesize end as imagesize, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.deviceip else impression.deviceip end as deviceip, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.make else impression.make end as make, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.model else impression.model end as model, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.os else impression.os end as os, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.osv else impression.osv end as osv, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.devicetype else impression.devicetype end as devicetype, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cncttype else impression.cncttype end as cncttype, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.countrycode else impression.countrycode end as countrycode, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.googleadid else impression.googleadid end as googleadid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.imeisha1 else impression.imeisha1 end as imeisha1, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.androididmd5 else impression.androididmd5 end as androididmd5, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.idfa else impression.idfa end as idfa, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.keywords else impression.keywords end as keywords, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.yob else impression.yob end as yob, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.gender else impression.gender end as gender, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext6 else impression.ext6 end as ext6, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext7 else impression.ext7 end as ext7, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext8 else impression.ext8 end as ext8, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext9 else impression.ext9 end as ext9, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext10 else impression.ext10 end as ext10, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.campaignid else impression.campaignid end as campaignid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cinstallprice else impression.cinstallprice end as cinstallprice, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cappname else impression.cappname end as cappname, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cpackagename else impression.cpackagename end as cpackagename, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cadvertiserid else impression.cadvertiserid end as cadvertiserid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ccreativeid else impression.ccreativeid end as ccreativeid, | case when request.requestid is not null and impression.exchanges != 'nexage' then 1 when request.requestid is null and impression.exchanges != 'nexage' then 0 end | from ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg | from adn_dsp.log_adn_dsp_impression_org_hour where rg = '${region}' and concat(yr,mt,dt,hh) = '${endtime}' | group by time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg | ) impression | left join | ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg | from adn_dsp.log_adn_dsp_bid_request_orc_hour where rg = '${region}' and concat(yr,mt,dt,hh) >= '${starttime}' and concat(yr,mt,dt,hh) <= '${endtime}' | group by time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg | ) request | on (impression.requestid = request.requestid) """.stripMargin spark.sql(hql) .rdd .map(_.mkString(DATA_SPLIT)) .saveAsTextFile(output, classOf[GzipCodec]) } finally { if (spark != null) { spark.stop() } } 0 } def commandOptions(): Options ={ val options = new Options() options.addOption("starttime", true, "starttime Time") options.addOption("endtime", true, "endtime Time") options.addOption("output", true, "output dir") options.addOption("region", true, "region info") options } } object DspImpressionHourFullOther { def main(args: Array[String]): Unit = { new DspImpressionHourFullOther().run(args) } }