package mobvista.dmp.datasource.dsp

import mobvista.dmp.common.{CommonMapReduce, CommonSparkJob}
import mobvista.dmp.util.{MD5Util, MRUtils}
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import java.net.URI
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer


class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("yyyymmddhh", true, "yyyymmddhh")
    options.addOption("output", true, "output")
    options.addOption("outputmds", true, "outputmds")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val coalesce = commandLine.getOptionValue("coalesce")
    val yyyymmddhh = commandLine.getOptionValue("yyyymmddhh")
    val output = commandLine.getOptionValue("output")
    val outputmds = commandLine.getOptionValue("outputmds")

    val spark = SparkSession
      .builder()
      .config("spark.rdd.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    try {
      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputmds), true)

      import spark.implicits._

      val sql =
        s"""
         select idfa,googleadid gaid,
           |  os platform,countrycode country,
           |  deviceip ip,gender,yob birthday,
           |  make,model,osv osversion,ext6 ua,
           |  appid packagename,ext3,exchanges,ext5 exitid,`time`,
           |  body json_msg,rg region
           |  from adn_dsp.log_adn_dsp_request_orc_hour where concat(yr,mt,dt,hh)='${yyyymmddhh}'
        """.stripMargin
      val df = spark.sql(sql).filter(filterData _)
        .rdd
        .map(parseMapData).toDF()
        .persist(StorageLevel.MEMORY_AND_DISK_SER)

      df.select("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osVersion", "ua", "packageName", "exitId", "time", "geoInfo", "longitude", "latitude", "segment", "dealerid", "exchanges", "region")
        .toDF("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osversion", "ua", "packagename", "exitid", "time", "geoinfo", "longitude", "latitude", "segment", "dealerid", "exchanges", "region")
        .createOrReplaceTempView("dsp_org_etl_hours")

      val etl_sql =
        """
          |select idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion,ua,concat_ws('#',collect_set(packagename)) packagename,exitid,max(`time`) datetime,segment,dealerid,exchanges,concat_ws('#',collect_set(region)) region
          |from dsp_org_etl_hours
          |group by idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion,ua,exitid,segment,dealerid,exchanges
        """.stripMargin
      spark.sql(etl_sql).repartition(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

      df.select("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osVersion", "packageName", "exitId", "time", "geoInfo", "longitude", "latitude", "segment", "region")
        .toDF("idfa", "gaid", "platform", "country", "ip", "gender", "birthday", "maker", "model", "osversion", "packagename", "exitid", "time", "geoinfo", "longitude", "latitude", "segment", "region")
        .createOrReplaceTempView("dsp_org_mds_hours")

      val mds_sql =
        """
          |select idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion, concat_ws('#',collect_set(packagename)) packagename,exitid,`time`,geoinfo,longitude, latitude,segment,concat_ws('#',collect_set(region)) region
          |from dsp_org_mds_hours
          |group by idfa,gaid,platform,country,ip,gender,birthday,maker,model,osversion,exitid,`time`,geoinfo,longitude, latitude,segment
        """.stripMargin

      val midData = spark.sql(mds_sql).rdd.flatMap(parseMapDataMDS).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker",
        "model", "os_version", "package_list", "androidids", "datetime", "geoInfo", "longitude", "latitude", "segment")

      midData.orderBy("device_id", "device_type").rdd.map(line => {
        MRUtils.JOINER.join(line.getAs[String]("device_id"),
          line.getAs[String]("device_type"),
          line.getAs[String]("platform"),
          line.getAs[String]("datetime"),
          line.getAs[String]("ip"),
          line.getAs[String]("geoInfo"),
          line.getAs[String]("longitude"),
          line.getAs[String]("latitude")
        )
      }).coalesce(coalesce.toInt).saveAsTextFile(outputmds, classOf[GzipCodec])


    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  def parseMapData(row: Row): DspReqVODemo = {

    val idfa = row.getAs[String]("idfa")
    val gaid = row.getAs[String]("gaid")

    var packageName = ""
    if (StringUtils.isNotBlank(row.getAs[String]("packagename"))) {
      packageName = row.getAs[String]("packagename")
    }
    val platform = row.getAs[String]("platform")
    val time = row.getAs[String]("time")
    val ip = row.getAs[String]("ip")
    val maker = row.getAs[String]("make")
    val model = row.getAs[String]("model")
    val osVersion = row.getAs[String]("osversion")
    val ua = row.getAs[String]("ua")
    val country = row.getAs[String]("country")
    val birthday = row.getAs[String]("birthday")
    val gender = row.getAs[String]("gender")
    val exitId = row.getAs[String]("exitid")
    val exchanges = row.getAs[String]("exchanges")
    var dealerid = ""
    if("mopub".equalsIgnoreCase(exchanges)  || "oppocn".equalsIgnoreCase(exchanges)  || "inmobi".equalsIgnoreCase(exchanges) || "bes".equalsIgnoreCase(exchanges)  || "iqiyi".equalsIgnoreCase(exchanges) || "vivo".equalsIgnoreCase(exchanges) ){
      val ext3 = row.getAs[String]("ext3")
      if(StringUtils.isNotBlank(ext3) && ext3.startsWith("{")){
        try{
          val testObj = GsonUtil.String2JsonObject(ext3)
          val dealids =  testObj.get("dealids")
          if(dealids != null && !dealids.isJsonNull){
            dealerid = dealids.getAsString
          }else{
            val ruleids =  testObj.get("ruleids")
            if(ruleids != null && !ruleids.isJsonNull){
              dealerid = ruleids.getAsString
            }
          }
        } catch {
          case e: Exception =>{e.printStackTrace()}
        }
      }
    }

    val jsonMsg = row.getAs[String]("json_msg")
    val region = row.getAs[String]("region")

    var geoInfo = ""
    var longitude = ""
    var latitude = ""
    var segment = ""
    //  处理jsonMsg,获取geo属性值
    if (jsonMsg.startsWith("{")) {
      val json = GsonUtil.String2JsonObject(jsonMsg)
      val element = json.get("device")
      if (element != null && !element.isJsonNull) {
        val geoElement = element.getAsJsonObject.get("geo")
        if (geoElement != null && !geoElement.isJsonNull) {
          geoInfo = geoElement.toString
          val geoJson = geoElement.getAsJsonObject
          val lonElement = geoJson.get("lon")
          if (lonElement != null && !lonElement.isJsonNull) {
            longitude = lonElement.toString
          }
          val latElement = geoJson.get("lat")
          if (latElement != null && !latElement.isJsonNull) {
            latitude = latElement.toString
          }
        }
      }

      // 获取segment信息
      val userElement = json.get("user")
      if (userElement != null && !userElement.isJsonNull) {
        val dataElement = userElement.getAsJsonObject.get("data")
        if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray
          .foreach(dataEle => {
            val segElement = dataEle.getAsJsonObject.get("segment")
            if (segElement != null && !segElement.isJsonNull && segElement.toString.startsWith("[") && segElement.toString.endsWith("]")) {
              segment = segElement.toString
            }
          })
      }
    }


    DspReqVODemo(idfa, gaid, platform, country, ip, gender, birthday, maker, model, osVersion, ua,
      packageName, exitId, time, geoInfo, longitude, latitude, segment, dealerid, exchanges, region)
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("detailOutPath", true, "[must] detailOutPath")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }

  def filterData(row: Row): Boolean = {

    val idfa = row.getAs[String]("idfa")
    val gaid = row.getAs[String]("gaid")
    var packageName = row.getAs[String]("packagename")
    val platform = row.getAs[String]("platform")
    val exitId = row.getAs[String]("exitid")
    val country = row.getAs[String]("country")

    if (!"ios".equals(platform) && !"android".equals(platform)) {
      return false
    }

    var deviceId = ""
    if ("ios".equals(platform) && StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !allZero.equals(idfa)) {
      deviceId = idfa
    } else if ("android".equals(platform) && StringUtils.isNotBlank(gaid) && gaid.matches(didPtn) && !allZero.equals(gaid)) {
      deviceId = gaid
    }

    var androidId =""
    var imei =""
    var imeimd5 = ""
    if(StringUtils.isNotBlank(exitId)){
      val devIds = splitFun(exitId, ",")
      if(devIds.length>= 8 ){
        if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(CommonMapReduce.andriodIdPtn)) {
          androidId = devIds(7)
        }

        if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(CommonMapReduce.imeiPtn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          imei = devIds(4)
        }

        if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(CommonMapReduce.imeiMd5Ptn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          imeimd5 = devIds(5)
        }
      }
    }

    if (StringUtils.isBlank(deviceId) && StringUtils.isBlank(imei) && StringUtils.isBlank(imeimd5) && StringUtils.isBlank(androidId)) {
      return false
    }

    true
  }

  def parseMapDataMDS(row: Row): Array[DspReqVO] = {
    val arrayBuffer = new ArrayBuffer[DspReqVO]()

    var idfa = ""
    val idfaTmp = row.getAs[String]("idfa")
    if(StringUtils.isNotBlank(idfaTmp) && idfaTmp.matches(didPtn)) idfa = idfaTmp

    var gaid = ""
    val gaidTmp = row.getAs[String]("gaid")
    if(StringUtils.isNotBlank(gaidTmp) && gaidTmp.matches(didPtn)) gaid = gaidTmp

    val platform = row.getAs[String]("platform")
    val country = row.getAs[String]("country")
    val ip = row.getAs[String]("ip")
    val gender = row.getAs[String]("gender")
    val birthday = row.getAs[String]("birthday")
    val maker = row.getAs[String]("maker")
    val model = row.getAs[String]("model")
    val osVersion = row.getAs[String]("osversion")
    var packageName = ""
    if(StringUtils.isNotBlank(row.getAs[String]("packagename"))){
      packageName = row.getAs[String]("packagename")
    }

    val exitId = row.getAs[String]("exitid")
    val  time =  row.getAs[String]("time")
    val geoInfo =  row.getAs[String]("geoinfo")
    val longitude =  row.getAs[String]("longitude")
    val latitude =  row.getAs[String]("latitude")
    var segment = ""
    if(StringUtils.isNotBlank(row.getAs[String]("segment"))){
      segment = row.getAs[String]("segment")
    }

    var androidId = ""
    var imei = ""
    var imeimd5 = ""
    if(StringUtils.isNotBlank(exitId)){
      val devIds = splitFun(exitId, ",")
      if(devIds.length>= 8 ){
        if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(CommonMapReduce.andriodIdPtn)) {
          androidId = devIds(7)
        }

        if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(CommonMapReduce.imeiPtn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          imei = devIds(4)
        }

        if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(CommonMapReduce.imeiMd5Ptn) && "android".equals(platform) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          imeimd5 = devIds(5)
        }
      }
    }

    var deviceType = ""
    var deviceId = ""

    if ("ios".equals(platform) && idfa.length > 4 && !allZero.equals(idfa)) {
      deviceId = idfa
      deviceType = "idfa"
    } else if ("android".equals(platform) && gaid.length > 4 && !allZero.equals(gaid)) {
      deviceId = gaid
      deviceType = "gaid"
    }

    if (!deviceId.isEmpty) {
      arrayBuffer += DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)
    }

    if (!androidId.isEmpty) {
      arrayBuffer += DspReqVO(androidId, "androidid", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)
    }

    if (StringUtils.isNotBlank(imei)) {
      arrayBuffer += DspReqVO(imei, "imei", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)

      val imeimd5 = MD5Util.getMD5Str(imei)
      arrayBuffer += DspReqVO(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)

    }

    if (StringUtils.isNotBlank(imeimd5)) {
      arrayBuffer += DspReqVO(imeimd5, "imeimd5", "android", country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)
    }
    arrayBuffer.toArray
  }

  def dspSchema: StructType = {
    StructType(StructField("time", StringType) ::
      StructField("xforwardip", StringType) ::
      StructField("ip", StringType) ::
      StructField("exchanges", StringType) ::
      StructField("elapsed", StringType) ::
      StructField("url", StringType) ::
      StructField("body", StringType) ::
      StructField("requestid", StringType) ::
      StructField("bid", StringType) ::
      StructField("price", StringType) ::
      StructField("describe", StringType) ::
      StructField("ext1", StringType) ::
      StructField("ext2", StringType) ::
      StructField("ext3", StringType) ::
      StructField("ext4", StringType) ::
      StructField("ext5", StringType) ::
      StructField("auctiontype", StringType) ::
      StructField("bidreqid", StringType) ::
      StructField("impid", StringType) ::
      StructField("publisherid", StringType) ::
      StructField("appid", StringType) ::
      StructField("appname", StringType) ::
      StructField("posid", StringType) ::
      StructField("category", StringType) ::
      StructField("intl", StringType) ::
      StructField("imagesize", StringType) ::
      StructField("deviceip", StringType) ::
      StructField("make", StringType) ::
      StructField("model", StringType) ::
      StructField("os", StringType) ::
      StructField("osv", StringType) ::
      StructField("devicetype", StringType) ::
      StructField("cncttype", StringType) ::
      StructField("countrycode", StringType) ::
      StructField("googleadid", StringType) ::
      StructField("imeishal", StringType) ::
      StructField("androididmd5", StringType) ::
      StructField("idfa", StringType) ::
      StructField("keywords", StringType) ::
      StructField("yob", StringType) ::
      StructField("gender", StringType) ::
      StructField("ext6", StringType) ::
      StructField("ext7", StringType) ::
      StructField("ext8", StringType) ::
      StructField("ext9", StringType) ::
      StructField("ext10", StringType) ::
      StructField("campaignid", StringType) ::
      StructField("cinstallprice", StringType) ::
      StructField("cappname", StringType) ::
      StructField("cpackagename", StringType) ::
      StructField("cadvertiserid", StringType) ::
      StructField("ccreativeid", StringType)
      :: Nil)
  }
}

object DspOrgLogEtlHoursDemo {
  def main(args: Array[String]): Unit = {
    new DspOrgLogEtlHoursDemo().run(args)
  }
}