package mobvista.dmp.datasource.dsp

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.MultipleOrcOutputFormat
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.orc.TypeDescription
import org.apache.orc.mapred.OrcStruct
import org.apache.spark.sql.{Row, SparkSession}


class DspClickImpressionCombineOrc extends CommonSparkJob with Serializable {
  val schema = "struct<time:string,xforwardip:string,ip:string,exchanges:string,elapsed:string,url:string,body:string,requestid:string,bid:string,price:string,describe:string,ext1:string,ext2:string,ext3:string,ext4:string,ext5:string,auctiontype:string,bidreqid:string,impid:string,publisherid:string,appid:string,appname:string,posid:string,category:string,intl:string,imagesize:string,deviceip:string,make:string,model:string,os:string,osv:string,devicetype:string,cncttype:string,countrycode:string,googleadid:string,imeisha1:string,androididmd5:string,idfa:string,keywords:string,yob:string,gender:string,ext6:string,ext7:string,ext8:string,ext9:string,ext10:string,campaignid:string,cinstallprice:string,cappname:string,cpackagename:string,cadvertiserid:string,ccreativeid:string>"

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val cur_date = commandLine.getOptionValue("cur_date")
    val begin_date = commandLine.getOptionValue("begin_date")
    val output = commandLine.getOptionValue("output")
    val cpcoutput = commandLine.getOptionValue("cpcoutput")
    val cpmoutput = commandLine.getOptionValue("cpmoutput")
    val cpioutput = commandLine.getOptionValue("cpioutput")
    val coalesce = commandLine.getOptionValue("coalesce")


    val spark = SparkSession
      .builder()
      .config("spark.rdd.compress", "true")
      .config("spark.speculation", "false")
      .config("spark.speculation.quantile", "0.9")
      .config("spark.speculation.multiplier", "1")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("mapreduce.fileoutputcommitter.marksuccessfuljobs","false")
      .enableHiveSupport()
      .getOrCreate()

     FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(cpcoutput), true)
     FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(cpmoutput), true)
     FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(cpioutput), true)
    println("<====>" + cpcoutput +";"+ cpmoutput +";"+ cpioutput)

    try {

      val hql =
        s"""
           |select
           |coalesce(t1.time,t2.time) time,
           |t2.xforwardip   ,
           |t2.ip           ,
           |coalesce(t1.exchanges,t2.exchanges) exchanges,
           |t2.elapsed      ,
           |t2.url          ,
           |t2.body         ,
           |t2.requestid    ,
           |t2.bid          ,
           |t2.price        ,
           |t2.`describe`     ,
           |coalesce(t1.algorithm,t2.ext1) ext1,
           |t2.ext2         ,
           |t2.ext3         ,
           |t2.ext4         ,
           |t2.ext5         ,
           |t2.auctiontype  ,
           |t2.bidreqid     ,
           |t2.impid        ,
           |coalesce(t1.publisher_id,t2.publisherid)  publisherid,
           |coalesce(t1.app_id,t2.appid)  appid,
           |coalesce(t1.package,t2.appname ) appname,
           |t2.posid        ,
           |t2.category     ,
           |t2.intl         ,
           |t2.imagesize    ,
           |t2.deviceip     ,
           |t2.make         ,
           |t2.model        ,
           |coalesce(t1.platform, t2.os) os,
           |coalesce(t1.os_version,t2.osv) osv,
           |t2.devicetype   ,
           |t2.cncttype     ,
           |coalesce(t1.country_code,t2.countrycode) countrycode  ,
           |coalesce(t1.gaid,t2.googleadid) googleadid,
           |t2.imeisha1     ,
           |t2.androididmd5 ,
           |coalesce(t1.idfa,t2.idfa) idfa,
           |t2.keywords     ,
           |t2.yob          ,
           |t2.gender       ,
           |t2.ext6         ,
           |t2.ext7         ,
           |t2.ext8         ,
           |t2.ext9         ,
           |t2.ext10        ,
           |coalesce(t1.campaign_id,t2.campaignid )  campaignid ,
           |t2.cinstallprice,
           |t2.cappname     ,
           |t2.cpackagename ,
           |coalesce(t1.advertiser_id,t2.cadvertiserid) cadvertiserid,
           |t2.ccreativeid,
           |t1.day,
           |t1.billing_type
           |from (select * from adn_dsp.ods_dsp_click_v4 where day ='${cur_date}' ) t1
           |left join (select * from adn_dsp.log_adn_dsp_impression_hour where concat(yr,mt,dt) >= '${begin_date}' and concat(yr,mt,dt) >= '${cur_date}') t2
           |on(t1.requestid = t2.requestid)
            """.stripMargin
      spark.sql(hql)
        .rdd.map(line =>{
        val billing_type = line.getAs[String]("billing_type")
        new Tuple2(new Text(s"${output}/${billing_type}"), genRow(line,schema))})
        .coalesce(coalesce.toInt)
        .saveAsNewAPIHadoopFile(
          s"${cpioutput}", classOf[Text], classOf[OrcStruct], classOf[MultipleOrcOutputFormat[Text, OrcStruct]],
          initConfig(spark.sparkContext.hadoopConfiguration))

    } finally {

      if (spark != null) {
        spark.stop()
      }
    }
    0
  }


  def genVal(str: String): String = {
    if(StringUtils.isNotBlank(str)){
      str
    }else ""
  }

  def genRow(row: Row, schema: String): OrcStruct = {
    val struct = OrcStruct.createValue(TypeDescription.fromString(schema)).asInstanceOf[OrcStruct]

    struct.setFieldValue(0, new Text( genVal(row.getAs[String]("time"))))
    struct.setFieldValue(1, new Text(genVal(row.getAs[String]("xforwardip"))))
    struct.setFieldValue(2, new Text(genVal(row.getAs[String]("ip"))))
    struct.setFieldValue(3, new Text(genVal(row.getAs[String]("exchanges"))))
    struct.setFieldValue(4, new Text(genVal(row.getAs[String]("elapsed"))))
    struct.setFieldValue(5, new Text(genVal(row.getAs[String]("url"))))
    struct.setFieldValue(6, new Text(genVal(row.getAs[String]("body"))))
    struct.setFieldValue(7, new Text(genVal(row.getAs[String]("requestid"))))
    struct.setFieldValue(8, new Text(genVal(row.getAs[String]("bid"))))
    struct.setFieldValue(9, new Text(genVal(row.getAs[String]("price"))))
    struct.setFieldValue(10, new Text(genVal(row.getAs[String]("describe"))))
    struct.setFieldValue(11, new Text(genVal(row.getAs[String]("ext1"))))
    struct.setFieldValue(12, new Text(genVal(row.getAs[String]("ext2"))))
    struct.setFieldValue(13, new Text(genVal(row.getAs[String]("ext3"))))
    struct.setFieldValue(14, new Text(genVal(row.getAs[String]("ext4"))))
    struct.setFieldValue(15, new Text(genVal(row.getAs[String]("ext5"))))
    struct.setFieldValue(16, new Text(genVal(row.getAs[String]("auctiontype"))))
    struct.setFieldValue(17, new Text(genVal(row.getAs[String]("bidreqid"))))
    struct.setFieldValue(18, new Text(genVal(row.getAs[String]("impid"))))
    struct.setFieldValue(19, new Text(genVal(row.getAs[String]("publisherid"))))
    struct.setFieldValue(20, new Text(genVal(row.getAs[String]("appid"))))
    struct.setFieldValue(21, new Text(genVal(row.getAs[String]("appname"))))
    struct.setFieldValue(22, new Text(genVal(row.getAs[String]("posid"))))
    struct.setFieldValue(23, new Text(genVal(row.getAs[String]("category"))))
    struct.setFieldValue(24, new Text(genVal(row.getAs[String]("intl"))))
    struct.setFieldValue(25, new Text(genVal(row.getAs[String]("imagesize"))))
    struct.setFieldValue(26, new Text(genVal(row.getAs[String]("deviceip"))))
    struct.setFieldValue(27, new Text(genVal(row.getAs[String]("make"))))
    struct.setFieldValue(28, new Text(genVal(row.getAs[String]("model"))))
    struct.setFieldValue(29, new Text(genVal(row.getAs[String]("os"))))
    struct.setFieldValue(30, new Text(genVal(row.getAs[String]("osv"))))
    struct.setFieldValue(31, new Text(genVal(row.getAs[String]("devicetype"))))
    struct.setFieldValue(32, new Text(genVal(row.getAs[String]("cncttype"))))
    struct.setFieldValue(33, new Text(genVal(row.getAs[String]("countrycode"))))
    struct.setFieldValue(34, new Text(genVal(row.getAs[String]("googleadid"))))
    struct.setFieldValue(35, new Text(genVal(row.getAs[String]("imeisha1"))))
    struct.setFieldValue(36, new Text(genVal(row.getAs[String]("androididmd5"))))
    struct.setFieldValue(37, new Text(genVal(row.getAs[String]("idfa"))))
    struct.setFieldValue(38, new Text(genVal(row.getAs[String]("keywords"))))
    struct.setFieldValue(39, new Text(genVal(row.getAs[String]("yob"))))
    struct.setFieldValue(40, new Text(genVal(row.getAs[String]("gender"))))
    struct.setFieldValue(41, new Text(genVal(row.getAs[String]("ext6"))))
    struct.setFieldValue(42, new Text(genVal(row.getAs[String]("ext7"))))
    struct.setFieldValue(43, new Text(genVal(row.getAs[String]("ext8"))))
    struct.setFieldValue(44, new Text(genVal(row.getAs[String]("ext9"))))
    struct.setFieldValue(45, new Text(genVal(row.getAs[String]("ext10"))))
    struct.setFieldValue(46, new Text(genVal(row.getAs[String]("campaignid"))))
    struct.setFieldValue(47, new Text(genVal(row.getAs[String]("cinstallprice"))))
    struct.setFieldValue(48, new Text(genVal(row.getAs[String]("cappname"))))
    struct.setFieldValue(49, new Text(genVal(row.getAs[String]("cpackagename"))))
    struct.setFieldValue(50, new Text(genVal(row.getAs[String]("cadvertiserid"))))
    struct.setFieldValue(51, new Text(genVal(row.getAs[String]("ccreativeid"))))
    struct
  }


  def commandOptions(): Options ={
    val options = new Options()
    options.addOption("cur_date", true, "cur_date Time")
    options.addOption("begin_date", true, "begin_date Time")
    options.addOption("output", true, "output dir")
    options.addOption("cpcoutput", true, "cpcoutput dir")
    options.addOption("cpmoutput", true, "cpmoutput dir")
    options.addOption("cpioutput", true, "cpioutput dir")
    options.addOption("coalesce",true,"coalesce")
    options
  }


  def initConfig(conf: Configuration): Configuration = {
    conf.set("orc.mapred.output.schema", schema)
    conf.setBoolean("mapreduce.output.compress", true)
    conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
    conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
    conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
    conf.set("orc.compress", "ZLIB")
    conf
  }
}

object DspClickImpressionCombineOrc {
  def main(args: Array[String]): Unit = {
    new DspClickImpressionCombineOrc().run(args)
  }
}