package mobvista.dmp.datasource.dsp

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.SparkSession

class DspImpressionHourFullOther extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val starttime = commandLine.getOptionValue("starttime")
    val endtime = commandLine.getOptionValue("endtime")
    val region = commandLine.getOptionValue("region")
    val output = commandLine.getOptionValue("output")


    val spark = SparkSession
      .builder()
      .config("spark.rdd.compress", "true")
      .config("spark.speculation", "false")
      .config("spark.speculation.quantile", "0.9")
      .config("spark.speculation.multiplier", "1")
     // .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    try {
//
      spark.sql("set  hive.exec.dynamic.partition.mode=nonstrict")
      spark.sql("set  hive.exec.dynamic.partition=true")
      spark.sql("set hive.exec.compress.output=true")
      spark.sql("set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec")

      var  hql =
        s"""
           | select   impression.time,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.xforwardip  else impression.xforwardip   end as xforwardip,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ip  else impression.ip   end as ip,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.exchanges  else impression.exchanges   end as exchanges,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.elapsed  else impression.elapsed   end as elapsed,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.url  else impression.url   end as url,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.body  else impression.body   end as body,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then  request.requestid else  impression.requestid  end as requestid,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.bid  else impression.bid   end as bid,
           |           impression.price,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.`describe`  else impression.`describe`   end as `describe`,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext1  else impression.ext1   end as ext1,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext2  else impression.ext2   end as ext2,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext3  else impression.ext3   end as ext3,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext4  else impression.ext4   end as ext4,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext5  else impression.ext5   end as ext5,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.auctiontype  else impression.auctiontype   end as auctiontype ,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.bidreqid  else impression.bidreqid   end as bidreqid ,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.impid  else impression.impid   end as impid,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.publisherid  else impression.publisherid   end as publisherid,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.appid  else impression.appid   end as appid,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.appname  else impression.appname   end as appname,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.posid  else impression.posid   end as posid,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.category  else impression.category   end as category,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.intl  else impression.intl   end as intl,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.imagesize  else impression.imagesize   end as imagesize,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.deviceip  else impression.deviceip   end as deviceip,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.make  else impression.make   end as make,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.model  else impression.model   end as model,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.os  else impression.os   end as os,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.osv  else impression.osv   end as osv,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.devicetype  else impression.devicetype   end as devicetype,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cncttype  else impression.cncttype   end as cncttype,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.countrycode  else impression.countrycode   end as countrycode,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.googleadid  else impression.googleadid   end as googleadid,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.imeisha1  else impression.imeisha1   end as imeisha1,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.androididmd5  else impression.androididmd5   end as androididmd5,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.idfa  else impression.idfa   end as idfa,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.keywords  else impression.keywords   end as keywords,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.yob  else impression.yob   end as yob,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.gender  else impression.gender   end as gender,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext6  else impression.ext6   end as ext6,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext7  else impression.ext7   end as ext7,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext8  else impression.ext8   end as ext8,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext9  else impression.ext9   end as ext9,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ext10  else impression.ext10   end as ext10,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.campaignid  else impression.campaignid   end as campaignid,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cinstallprice  else impression.cinstallprice   end as cinstallprice,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cappname  else impression.cappname   end as cappname,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cpackagename  else impression.cpackagename   end as cpackagename,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.cadvertiserid  else impression.cadvertiserid   end as cadvertiserid,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then request.ccreativeid  else impression.ccreativeid   end as ccreativeid,
           |           case when request.requestid is not null and impression.exchanges != 'nexage'  then 1  when request.requestid is null and impression.exchanges != 'nexage' then  0 end
           |           from ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg
           |           from adn_dsp.log_adn_dsp_impression_org_hour  where  rg = '${region}' and concat(yr,mt,dt,hh) = '${endtime}'
           |           group by time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg
           |           ) impression
           |           left join
           |           ( select time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg
           |           from adn_dsp.log_adn_dsp_bid_request_orc_hour where  rg = '${region}' and concat(yr,mt,dt,hh) >= '${starttime}' and  concat(yr,mt,dt,hh) <= '${endtime}'
           |           group by time,xforwardip,ip,exchanges,elapsed,url,body,requestid,bid,price,`describe`,ext1,ext2,ext3,ext4,ext5,auctiontype,bidreqid,impid,publisherid,appid,appname,posid,category,intl,imagesize,deviceip,make,model,os,osv,devicetype,cncttype,countrycode,googleadid,imeisha1,androididmd5,idfa,keywords,yob,gender,ext6,ext7,ext8,ext9,ext10,campaignid,cinstallprice,cappname,cpackagename,cadvertiserid,ccreativeid,yr,mt,dt,rg
           |           ) request
           |           on (impression.requestid = request.requestid)
            """.stripMargin
      spark.sql(hql)
        .rdd
        .map(_.mkString(DATA_SPLIT))
        .saveAsTextFile(output, classOf[GzipCodec])

    } finally {

      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  def commandOptions(): Options ={
    val options = new Options()
    options.addOption("starttime", true, "starttime Time")
    options.addOption("endtime", true, "endtime Time")
    options.addOption("output", true, "output dir")
    options.addOption("region", true, "region info")
    options
  }

}

object DspImpressionHourFullOther {
  def main(args: Array[String]): Unit = {
    new DspImpressionHourFullOther().run(args)
  }
}