package mobvista.dmp.datasource.dsp

import java.net.URI
import java.util
import java.util.regex.Pattern

import com.fasterxml.jackson.databind.ObjectMapper
import mobvista.dmp.common.{CommonMapReduce, CommonSparkJob}
import mobvista.dmp.util.MD5Util
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import mobvista.prd.datasource.util.GsonUtil
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class DspOrgLogEtlHours extends CommonSparkJob with Serializable {
  private val IDFA = 37
  private val GAID = 34
  private val PKG_NAME = 20
  private val PLATFORM = 29
  private val UPDATE_TIME = 0
  private val IP = 26
  private val MAKER = 27
  private val MODEL = 28
  private val OS_VERSION = 30
  private val COUNTRY_CODE = 33
  private val BIRTHDAY = 39
  private val GENDER = 40
  private val EXT_ID = 15
  private val JSON_MSG = 6

  private val iosPkgPtn = Pattern.compile("^\\d+$")
  private val adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$")


  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("detailOutPath", true, "detailOutPath")
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val coalesce = commandLine.getOptionValue("coalesce")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val detailOutput = commandLine.getOptionValue("detailOutPath")

    val spark = SparkSession
      .builder()
      .config("spark.rdd.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    try {
      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(detailOutput), true)

      import spark.implicits._
      //  On cluster, schema must be specified manually.
      val df = spark.read.schema(dspSchema).orc(input)
        .filter(_.size >= 52)
        .filter(filterData _)
        .rdd
        .flatMap(parseMapData).toDF()
        .persist(StorageLevel.MEMORY_AND_DISK_SER)

      val mergeUdf = udf((segmentArrays: mutable.WrappedArray[String]) => {
        val segmentSet: util.Set[String] = new util.HashSet[String]()
        for (segmentArray <- segmentArrays) {
          val jsonArray = GsonUtil.String2JsonArray(segmentArray.toString)
          for (json <- jsonArray) {
            segmentSet.add(json.toString)
          }
        }
        "[" + segmentSet.mkString(",") + "]"
      })



      val agg_df = df.groupBy("deviceId","deviceType")
        .agg(first("platform"),
          first("country"),
          first("ip"),
          first("gender"),
          first("birthday"),
          first("maker"),
          first("model"),
          first("osVersion"),
          collect_set("packageName"),
          collect_set("androidId"), //  ["942da5a0819d803d","1c52635982b7b999",""]
          max("time"),
          mergeUdf(collect_set("segment")).alias("segment_ids")
        ).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker",
        "model", "os_version", "package_list", "androidids", "datetime", "segment_ids")

      agg_df.coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)


      val mds_df = df.select("deviceId", "deviceType", "platform", "time", "ip", "geoInfo", "longitude", "latitude")
        .toDF("device_id", "device_type", "platform", "req_time", "ip", "geo", "longitude", "latitude")

      mds_df.coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(detailOutput)


    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  def parseMapData(row: Row): Array[DspReqVO] = {
    val arrayBuffer = new ArrayBuffer[DspReqVO]()

    val idfa = row.getString(IDFA)
    val gaid = row.getString(GAID)
    var packageName = row.getString(PKG_NAME)
    val platform = row.getString(PLATFORM)
    val time = row.getString(UPDATE_TIME)
    val ip = row.getString(IP)
    val maker = row.getString(MAKER)
    val model = row.getString(MODEL)
    val osVersion = row.getString(OS_VERSION)
    val country = row.getString(COUNTRY_CODE)
    val birthday = row.getString(BIRTHDAY)
    val gender = row.getString(GENDER)
    val exitId = row.getString(EXT_ID)
    val jsonMsg = row.getString(JSON_MSG)

    if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id\\d+$")) {
      packageName = packageName.replaceAll("id", "")
    }
   // val packageId = packageMap.value.getOrElse(packageName.toLowerCase(), 0)

    var deviceId = ""
    var deviceType = ""
    if ("ios".equals(platform)) {
      deviceId = idfa
      deviceType = "idfa"
    } else if ("android".equals(platform)) {
      deviceId = gaid
      deviceType = "gaid"
    }

    var androidId =""
    var imei =""
    var imeimd5 = ""
    if(StringUtils.isNotBlank(exitId)){
      val devIds = splitFun(exitId, ",")
      if(devIds.length>= 8 ){
        if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(CommonMapReduce.andriodIdPtn)) {
          androidId = devIds(7)
        }

        if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(CommonMapReduce.imeiPtn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          imei = devIds(4)
        }

        if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(CommonMapReduce.imeiMd5Ptn) && StringUtils.isNotBlank(country) && "CN".equalsIgnoreCase(country)) {
          imeimd5 = devIds(5)
        }
      }
    }

    var geoInfo = ""
    var longitude = ""
    var latitude = ""
    var segment = ""
    //  处理jsonMsg,获取geo属性值
    if (jsonMsg.startsWith("{")) {
      val json = GsonUtil.String2JsonObject(jsonMsg)
      val element = json.get("device")
      if (element != null && !element.isJsonNull) {
        val geoElement = element.getAsJsonObject.get("geo")
        if (geoElement != null && !geoElement.isJsonNull) {
          geoInfo = geoElement.toString
          val geoJson = geoElement.getAsJsonObject
          val lonElement = geoJson.get("lon")
          if (lonElement != null && !lonElement.isJsonNull) {
            longitude = lonElement.toString
          }
          val latElement = geoJson.get("lat")
          if (latElement != null && !latElement.isJsonNull) {
            latitude = latElement.toString
          }
        }
      }

      // 获取segment信息
      val userElement = json.get("user")
      if (userElement != null && !userElement.isJsonNull) {
        val dataElement = userElement.getAsJsonObject.get("data")
        if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray
          .foreach(dataEle => {
            val segElement = dataEle.getAsJsonObject.get("segment")
            if (segElement != null && !segElement.isJsonNull && segElement.toString.startsWith("[") && segElement.toString.endsWith("]")) {
              segment = segElement.toString
            }
          })
      }
    }

    if(!deviceId.isEmpty){
      arrayBuffer +=  DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)
    }

    if(!androidId.isEmpty){
      arrayBuffer +=  DspReqVO(androidId, "androidid", platform, country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)
    }

    if(!imei.isEmpty){
      arrayBuffer +=  DspReqVO(imei, "imei", platform, country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)

      val imeimd5 = MD5Util.getMD5Str(imei)
      arrayBuffer +=  DspReqVO(imeimd5, "imeimd5", platform, country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)

    }

    if(!imeimd5.isEmpty){
      arrayBuffer +=  DspReqVO(imeimd5, "imeimd5", platform, country, ip, gender, birthday, maker, model, osVersion,
        packageName, androidId, time, geoInfo, longitude, latitude, segment)
    }

    arrayBuffer.toArray
  }

  /*def parseData(rows: Iterator[Row]): Iterator[DspReqVO] = {
    val res = new util.ArrayList[DspReqVO]()
    while (rows.hasNext) {
      val row = rows.next
      val idfa = row.getString(IDFA)
      val gaid = row.getString(GAID)
      var packageName = row.getString(PKG_NAME)
      val platform = row.getString(PLATFORM)
      val time = row.getString(UPDATE_TIME)
      val ip = row.getString(IP)
      val maker = row.getString(MAKER)
      val model = row.getString(MODEL)
      val osVersion = row.getString(OS_VERSION)
      val country = row.getString(COUNTRY_CODE)
      val birthday = row.getString(BIRTHDAY)
      val gender = row.getString(GENDER)
      val exitId = row.getString(EXT_ID)
      val jsonMsg = row.getString(JSON_MSG)

      if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id\\\\d+$")) {
        packageName = packageName.replaceAll("id", "")
      }

      var deviceId = ""
      var deviceType = ""
      if ("ios".equals(platform)) {
        deviceId = idfa
        deviceType = "idfa"
      } else if ("android".equals(platform)) {
        deviceId = gaid
        deviceType = "gaid"
      }

      val androidId = splitFun(exitId, ",")(5)

      var geoInfo = ""
      var longitude = ""
      var latitude = ""
      //  var segment = ""
      val segmentMap: mutable.HashMap[String, SegmentVO] = new mutable.HashMap[String, SegmentVO]()
      //处理jsonMsg,获取geo属性值
      if (jsonMsg.startsWith("{")) {
        val json = GsonUtil.String2JsonObject(jsonMsg)
        val element = json.get("device")
        if (element != null && !element.isJsonNull) {
          val geoElement = element.getAsJsonObject.get("geo")
          if (geoElement != null && !geoElement.isJsonNull) {
            geoInfo = geoElement.toString
            val geoJson = geoElement.getAsJsonObject
            val lonElement = geoJson.get("lon")
            if (lonElement != null && !lonElement.isJsonNull) {
              longitude = lonElement.toString
            }
            val latElement = geoJson.get("lat")
            if (latElement != null && !latElement.isJsonNull) {
              latitude = latElement.toString
            }
          }
        }

        // 获取segment信息
        val userElement = json.get("user")
        if (userElement != null && !userElement.isJsonNull) {
          val dataElement = userElement.getAsJsonObject.get("data")
          if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray
            .foreach(dataEle => {
              val segElement = dataEle.getAsJsonObject.get("segment")
              if (segElement != null && !segElement.isJsonNull) {
                val segmentVO = GsonUtil.fromJson(segElement, classOf[SegmentVO])
                //  segment = segElement.toString
                  segmentMap.put(segmentVO.getId, segmentVO)
              }
            })
        }
      }


      res.add(DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
        "0", androidId, time, geoInfo, longitude, latitude, ""))
    }
    import scala.collection.JavaConverters._
    res.asScala.iterator
  }*/

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("detailOutPath", true, "[must] detailOutPath")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }

  def filterData(row: Row): Boolean = {
    val idfa = row.getString(IDFA)
    val gaid = row.getString(GAID)
    var packageName = row.getString(PKG_NAME)
    val platform = row.getString(PLATFORM)
    val extId = row.getString(EXT_ID)

    if (!"ios".equals(platform) && !"android".equals(platform)) {
      return false
    }

    var deviceId = ""
    if ("ios".equals(platform) && StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !allZero.equals(idfa)) {
      if (packageName.matches("^id\\d+$") || packageName.matches("^\\d+$")) {
        packageName = packageName.replace("id", "")
        if (checkPkgName("ios", packageName) && packageName.length > 8) {
          deviceId = idfa
        }
      }
    } else if ("android".equals(platform) && StringUtils.isNotBlank(gaid) && gaid.matches(didPtn) && !allZero.equals(gaid) && checkPkgName("android", packageName)) {
      deviceId = gaid
    }
    if (StringUtils.isBlank(deviceId)) {
      return false
    }

    /* if (splitFun(extId, ",").length <=8 5) {
      return false
    }*/
    true
  }

  private def checkPkgName(platform: String, pkg: String) = platform match {
    case "ios" =>
      iosPkgPtn.matcher(pkg).matches || adrPkgPtn.matcher(pkg).matches
    case "android" =>
      adrPkgPtn.matcher(pkg).matches
    case _ =>
      false
  }

  def schema_age_gender: StructType = {
    StructType(StructField("device_id", StringType) ::
      StructField("tag", StringType) ::
      StructField("label", StringType) ::
      StructField("business", StringType) ::
      StructField("device_type", StringType) :: Nil)
  }

  def dspSchema: StructType = {
    StructType(StructField("time", StringType) ::
      StructField("xforwardip", StringType) ::
      StructField("ip", StringType) ::
      StructField("exchanges", StringType) ::
      StructField("elapsed", StringType) ::
      StructField("url", StringType) ::
      StructField("body", StringType) ::
      StructField("requestid", StringType) ::
      StructField("bid", StringType) ::
      StructField("price", StringType) ::
      StructField("describe", StringType) ::
      StructField("ext1", StringType) ::
      StructField("ext2", StringType) ::
      StructField("ext3", StringType) ::
      StructField("ext4", StringType) ::
      StructField("ext5", StringType) ::
      StructField("auctiontype", StringType) ::
      StructField("bidreqid", StringType) ::
      StructField("impid", StringType) ::
      StructField("publisherid", StringType) ::
      StructField("appid", StringType) ::
      StructField("appname", StringType) ::
      StructField("posid", StringType) ::
      StructField("category", StringType) ::
      StructField("intl", StringType) ::
      StructField("imagesize", StringType) ::
      StructField("deviceip", StringType) ::
      StructField("make", StringType) ::
      StructField("model", StringType) ::
      StructField("os", StringType) ::
      StructField("osv", StringType) ::
      StructField("devicetype", StringType) ::
      StructField("cncttype", StringType) ::
      StructField("countrycode", StringType) ::
      StructField("googleadid", StringType) ::
      StructField("imeishal", StringType) ::
      StructField("androididmd5", StringType) ::
      StructField("idfa", StringType) ::
      StructField("keywords", StringType) ::
      StructField("yob", StringType) ::
      StructField("gender", StringType) ::
      StructField("ext6", StringType) ::
      StructField("ext7", StringType) ::
      StructField("ext8", StringType) ::
      StructField("ext9", StringType) ::
      StructField("ext10", StringType) ::
      StructField("campaignid", StringType) ::
      StructField("cinstallprice", StringType) ::
      StructField("cappname", StringType) ::
      StructField("cpackagename", StringType) ::
      StructField("cadvertiserid", StringType) ::
      StructField("ccreativeid", StringType)
      :: Nil)
  }
}

object DspOrgLogEtlHours {
  def main(args: Array[String]): Unit = {
    new DspOrgLogEtlHours().run(args)
  }
}