package mobvista.dmp.datasource.dsp import java.net.URI import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.format.TextMultipleOutputFormat import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} import org.apache.spark.sql.{Row, SparkSession} import scala.collection.mutable.ArrayBuffer /** * alter table adn_dsp.log_adn_dsp_impression_hour add partition (yr='2020',mt='03',dt='19',rg='virginia',hh='03') location 's3://mob-ad/adn/dsp/impression/2020/03/19/virginia/03'; * * set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.dynamic.partition=true; set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapreduce.map.memory.mb=3072; set mapreduce.map.java.opts=-Xmx2458m ; set mapreduce.reduce.memory.mb=3072 ; set mapreduce.reduce.java.opts=-Xmx2458m ; insert overwrite table adn_dsp.log_adn_dsp_impression_hour partition(yr='2020',mt='03',dt='19',rg,hh) select impression.time, case when request.requestid is not null and impression.exchanges != 'nexage' then request.xforwardip else impression.xforwardip end as xforwardip, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ip else impression.ip end as ip, case when request.requestid is not null and impression.exchanges != 'nexage' then request.exchanges else impression.exchanges end as exchanges, case when request.requestid is not null and impression.exchanges != 'nexage' then request.elapsed else impression.elapsed end as elapsed, case when request.requestid is not null and impression.exchanges != 'nexage' then request.url else impression.url end as url, case when request.requestid is not null and impression.exchanges != 'nexage' then request.body else impression.body end as body, case when request.requestid is not null and impression.exchanges != 'nexage' then request.requestid else impression.requestid end as requestid, case when request.requestid is not null and impression.exchanges != 'nexage' then request.bid else impression.bid end as bid, impression.price, case when request.requestid is not null and impression.exchanges != 'nexage' then request.`describe` else impression.`describe` end as `describe`, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext1 else impression.ext1 end as ext1, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext2 else impression.ext2 end as ext2, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext3 else impression.ext3 end as ext3, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext4 else impression.ext4 end as ext4, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext5 else impression.ext5 end as ext5, case when request.requestid is not null and impression.exchanges != 'nexage' then request.auctiontype else impression.auctiontype end as auctiontype , case when request.requestid is not null and impression.exchanges != 'nexage' then request.bidreqid else impression.bidreqid end as bidreqid , case when request.requestid is not null and impression.exchanges != 'nexage' then request.impid else impression.impid end as impid, case when request.requestid is not null and impression.exchanges != 'nexage' then request.publisherid else impression.publisherid end as publisherid, case when request.requestid is not null and impression.exchanges != 'nexage' then request.appid else impression.appid end as appid, case when request.requestid is not null and impression.exchanges != 'nexage' then request.appname else impression.appname end as appname, case when request.requestid is not null and impression.exchanges != 'nexage' then request.posid else impression.posid end as posid, case when request.requestid is not null and impression.exchanges != 'nexage' then request.category else impression.category end as category, case when request.requestid is not null and impression.exchanges != 'nexage' then request.intl else impression.intl end as intl, case when request.requestid is not null and impression.exchanges != 'nexage' then request.imagesize else impression.imagesize end as imagesize, case when request.requestid is not null and impression.exchanges != 'nexage' then request.deviceip else impression.deviceip end as deviceip, case when request.requestid is not null and impression.exchanges != 'nexage' then request.make else impression.make end as make, case when request.requestid is not null and impression.exchanges != 'nexage' then request.model else impression.model end as model, case when request.requestid is not null and impression.exchanges != 'nexage' then request.os else impression.os end as os, case when request.requestid is not null and impression.exchanges != 'nexage' then request.osv else impression.osv end as osv, case when request.requestid is not null and impression.exchanges != 'nexage' then request.devicetype else impression.devicetype end as devicetype, case when request.requestid is not null and impression.exchanges != 'nexage' then request.cncttype else impression.cncttype end as cncttype, case when request.requestid is not null and impression.exchanges != 'nexage' then request.countrycode else impression.countrycode end as countrycode, case when request.requestid is not null and impression.exchanges != 'nexage' then request.googleadid else impression.googleadid end as googleadid, case when request.requestid is not null and impression.exchanges != 'nexage' then request.imeisha1 else impression.imeisha1 end as imeisha1, case when request.requestid is not null and impression.exchanges != 'nexage' then request.androididmd5 else impression.androididmd5 end as androididmd5, case when request.requestid is not null and impression.exchanges != 'nexage' then request.idfa else impression.idfa end as idfa, case when request.requestid is not null and impression.exchanges != 'nexage' then request.keywords else impression.keywords end as keywords, case when request.requestid is not null and impression.exchanges != 'nexage' then request.yob else impression.yob end as yob, case when request.requestid is not null and impression.exchanges != 'nexage' then request.gender else impression.gender end as gender, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext6 else impression.ext6 end as ext6, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext7 else impression.ext7 end as ext7, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext8 else impression.ext8 end as ext8, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext9 else impression.ext9 end as ext9, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext10 else impression.ext10 end as ext10, case when request.requestid is not null and impression.exchanges != 'nexage' then request.campaignid else impression.campaignid end as campaignid, case when request.requestid is not null and impression.exchanges != 'nexage' then request.cinstallprice else impression.cinstallprice end as cinstallprice, case when request.requestid is not null and impression.exchanges != 'nexage' then request.cappname else impression.cappname end as cappname, case when request.requestid is not null and impression.exchanges != 'nexage' then request.cpackagename else impression.cpackagename end as cpackagename, case when request.requestid is not null and impression.exchanges != 'nexage' then request.cadvertiserid else impression.cadvertiserid end as cadvertiserid, case when request.requestid is not null and impression.exchanges != 'nexage' then request.ccreativeid else impression.ccreativeid end as ccreativeid, case when request.requestid is not null and impression.exchanges != 'nexage' then 1 when request.requestid is null and impression.exchanges != 'nexage' then 0 end, coalesce(request.rg,impression.rg) rg,'03' hh from ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg from adn_dsp.log_adn_dsp_impression_org_orc_hour where concat(yr,mt,dt,hh) = '2020031903' group by time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg ) impression left join ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg from adn_dsp.log_adn_dsp_bid_request_orc_hour where concat(yr,mt,dt,hh) >= '2020031900' and concat(yr,mt,dt,hh) <= '2020031903' group by time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg ) request on (impression.requestid = request.requestid); */ class DspImpressionHourCombine extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val starttime = commandLine.getOptionValue("starttime") val endtime = commandLine.getOptionValue("endtime") val output = commandLine.getOptionValue("output") val cnoutput = commandLine.getOptionValue("cnoutput") val vgoutput = commandLine.getOptionValue("vgoutput") val tkoutput = commandLine.getOptionValue("tkoutput") val hhpath = commandLine.getOptionValue("hhpath") val spark = SparkSession .builder() .config("spark.rdd.compress", "true") .config("spark.speculation", "false") .config("spark.speculation.quantile", "0.9") .config("spark.speculation.multiplier", "1") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("mapreduce.fileoutputcommitter.marksuccessfuljobs","false") .enableHiveSupport() .getOrCreate() val conf = spark.sparkContext.hadoopConfiguration // conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec"); conf.set("mapreduce.output.compress", "true") conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.setBoolean("mapreduce.output.fileoutputformat.compress", true) conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec]) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(cnoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(vgoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(tkoutput), true) try { /* spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") spark.sql("set hive.exec.dynamic.partition=true") spark.sql("set hive.exec.compress.output=true") spark.sql("set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec")*/ val requestids = s""" |select requestid | from adn_dsp.log_adn_dsp_impression_org_orc_hour where concat(yr,mt,dt,hh) = '${endtime}' |group by requestid """.stripMargin spark.sql(requestids).createOrReplaceTempView("log_adn_dsp_impression_org_orc_hour") //从requestids中过滤出 val request_orc= s""" |select time,xforwardip,ip,exchanges,elapsed,url,body,t1.requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg from |(select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg | from adn_dsp.log_adn_dsp_bid_request_orc_hour where concat(yr,mt,dt,hh) >= '${starttime}' and concat(yr,mt,dt,hh) <= '${endtime}' ) t1 join log_adn_dsp_impression_org_orc_hour t2 on(t1.requestid = t2.requestid) | group by time,xforwardip,ip,exchanges,elapsed,url,body,t1.requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg """.stripMargin spark.sql(request_orc).createOrReplaceTempView("log_adn_dsp_bid_request_orc_hour") var hql = s""" select impression.time, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.xforwardip else impression.xforwardip end as xforwardip, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ip else impression.ip end as ip, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.exchanges else impression.exchanges end as exchanges, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.elapsed else impression.elapsed end as elapsed, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.url else impression.url end as url, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.body else impression.body end as body, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.requestid else impression.requestid end as requestid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.bid else impression.bid end as bid, | impression.price, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.`describe` else impression.`describe` end as `describe`, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext1 else impression.ext1 end as ext1, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext2 else impression.ext2 end as ext2, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext3 else impression.ext3 end as ext3, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext4 else impression.ext4 end as ext4, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext5 else impression.ext5 end as ext5, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.auctiontype else impression.auctiontype end as auctiontype , | case when request.requestid is not null and impression.exchanges != 'nexage' then request.bidreqid else impression.bidreqid end as bidreqid , | case when request.requestid is not null and impression.exchanges != 'nexage' then request.impid else impression.impid end as impid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.publisherid else impression.publisherid end as publisherid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.appid else impression.appid end as appid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.appname else impression.appname end as appname, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.posid else impression.posid end as posid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.category else impression.category end as category, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.intl else impression.intl end as intl, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.imagesize else impression.imagesize end as imagesize, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.deviceip else impression.deviceip end as deviceip, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.make else impression.make end as make, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.model else impression.model end as model, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.os else impression.os end as os, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.osv else impression.osv end as osv, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.devicetype else impression.devicetype end as devicetype, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cncttype else impression.cncttype end as cncttype, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.countrycode else impression.countrycode end as countrycode, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.googleadid else impression.googleadid end as googleadid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.imeisha1 else impression.imeisha1 end as imeisha1, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.androididmd5 else impression.androididmd5 end as androididmd5, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.idfa else impression.idfa end as idfa, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.keywords else impression.keywords end as keywords, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.yob else impression.yob end as yob, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.gender else impression.gender end as gender, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext6 else impression.ext6 end as ext6, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext7 else impression.ext7 end as ext7, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext8 else impression.ext8 end as ext8, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext9 else impression.ext9 end as ext9, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ext10 else impression.ext10 end as ext10, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.campaignid else impression.campaignid end as campaignid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cinstallprice else impression.cinstallprice end as cinstallprice, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cappname else impression.cappname end as cappname, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cpackagename else impression.cpackagename end as cpackagename, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.cadvertiserid else impression.cadvertiserid end as cadvertiserid, | case when request.requestid is not null and impression.exchanges != 'nexage' then request.ccreativeid else impression.ccreativeid end as ccreativeid, | case when request.requestid is not null and impression.exchanges != 'nexage' then 1 when request.requestid is null and impression.exchanges != 'nexage' then 0 end, | impression.impext, | coalesce(request.rg,impression.rg) rg | from ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,impext,yr,mt,dt,rg | from adn_dsp.log_adn_dsp_impression_org_orc_hour where concat(yr,mt,dt,hh) = '${endtime}' | group by time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,impext,yr,mt,dt,rg | ) impression | left join | log_adn_dsp_bid_request_orc_hour request | on (impression.requestid = request.requestid) """.stripMargin spark.sql(hql) .rdd .flatMap(buildResult(_, output,hhpath)) .repartition(200) .saveAsNewAPIHadoopFile(cnoutput, classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], conf) /* .map(_.mkString(DATA_SPLIT)) .saveAsTextFile(output, classOf[GzipCodec])*/ } finally { if (spark != null) { spark.stop() } } 0 } def buildResult (row: Row, outputPrefix: String,hhpath:String): Array[Tuple2[Text, Text]] = { val region = row.getString(54) val rowContent = row.mkString(DATA_SPLIT) val rowData = rowContent.substring(0,rowContent.lastIndexOf(DATA_SPLIT)) val buffer = new ArrayBuffer[Tuple2[Text, Text]]() buffer += Tuple2(new Text(s"${outputPrefix}/${region}/${hhpath}, "), new Text(rowData)) buffer.toArray } def commandOptions(): Options ={ val options = new Options() options.addOption("starttime", true, "starttime Time") options.addOption("endtime", true, "endtime Time") options.addOption("output", true, "output dir") options.addOption("cnoutput", true, "cnoutput dir") options.addOption("vgoutput", true, "vgoutput dir") options.addOption("tkoutput", true, "tkoutput dir") options.addOption("hhpath",true,"hhpath") options } } object DspImpressionHourCombine { def main(args: Array[String]): Unit = { new DspImpressionHourCombine().run(args) } }