package mobvista.dmp.datasource.dsp

import java.net.URI
import java.util
import java.util.regex.Pattern

import com.fasterxml.jackson.databind.ObjectMapper
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.dsp.mapreduce.SegmentVO
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import scala.collection.JavaConversions._
import scala.collection.mutable

class DspEtlHour extends CommonSparkJob with Serializable {
  private val IDFA = 37
  private val GAID = 34
  private val PKG_NAME = 20
  private val PLATFORM = 29
  private val UPDATE_TIME = 0
  private val IP = 26
  private val MAKER = 27
  private val MODEL = 28
  private val OS_VERSION = 30
  private val COUNTRY_CODE = 33
  private val BIRTHDAY = 39
  private val GENDER = 40
  private val EXT_ID = 15
  private val JSON_MSG = 6

  private val iosPkgPtn = Pattern.compile("^\\d+$")
  private val adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$")

  private val mapper = new ObjectMapper()
  private var packageMap: Broadcast[scala.collection.Map[String, Int]] = null

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("detailOutPath", true, "detailOutPath")
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val coalesce = commandLine.getOptionValue("coalesce")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val detailOutput = commandLine.getOptionValue("detailOutPath")

    //  .appName("DspEtlDaily")
    val spark = SparkSession
      .builder()
      .config("spark.rdd.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    try {
      var package_sql =
        """
          |SHOW PARTITIONS dwh.package_mapping
        """.stripMargin
      var partDF = spark.sql(package_sql)
      val package_dt = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1)
      package_sql =
        s"""
           |SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}'
        """.stripMargin
      packageMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => {
        (r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString))
      }).collectAsMap())

      //  val regionSet = Set("cn", "tokyo", "virginia")
      //  regionSet.foreach(region => {

      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(detailOutput), true)

      import spark.implicits._
      //  On cluster, schema must be specified manually.
      val df = spark.read.schema(dspSchema).orc(input)
        .filter(_.size >= 52)
        .filter(filterData _)
        .rdd
        .map(parseMapData)
        .toDF
        //  .persist(StorageLevel.MEMORY_AND_DISK_SER)

      /*
      val mds_df = df.select("deviceId", "deviceType", "platform", "time", "ip", "geoInfo", "longitude", "latitude")
        .toDF("device_id", "device_type", "platform", "req_time", "ip", "geo", "longitude", "latitude")

      mds_df.coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(detailOutput)
      */

      val mergeUdf = udf((segmentArrays: mutable.WrappedArray[String]) => {
        val segmentSet: util.Set[String] = new util.HashSet[String]()
        for (segmentArray <- segmentArrays) {
          val jsonArray = GsonUtil.String2JsonArray(segmentArray.toString)
          for (json <- jsonArray) {
            segmentSet.add(json.toString)
          }
        }
        "[" + segmentSet.mkString(",") + "]"
      })

      val agg_df = df.groupBy("deviceId")
        .agg(first("deviceType"),
          first("platform"),
          first("country"),
          first("ip"),
          first("gender"),
          first("birthday"),
          first("maker"),
          first("model"),
          first("osVersion"),
          collect_set("packageName"),
          collect_set("androidId"),
          max("time"),
          mergeUdf(collect_set("segment")).alias("segment_ids")
        ).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker",
        "model", "os_version", "package_list", "androidids", "datetime", "segment_ids")

      agg_df.write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  def parseMapData(row: Row): DspReqVO = {
    val idfa = row.getString(IDFA)
    val gaid = row.getString(GAID)
    var packageName = row.getString(PKG_NAME)
    val platform = row.getString(PLATFORM)
    val time = row.getString(UPDATE_TIME)
    val ip = row.getString(IP)
    val maker = row.getString(MAKER)
    val model = row.getString(MODEL)
    val osVersion = row.getString(OS_VERSION)
    val country = row.getString(COUNTRY_CODE)
    val birthday = row.getString(BIRTHDAY)
    val gender = row.getString(GENDER)
    val exitId = row.getString(EXT_ID)
    val jsonMsg = row.getString(JSON_MSG)

    if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id\\\\d+$")) {
      packageName = packageName.replaceAll("id", "")
    }
    //  val packageId = packageMap.value.getOrElse(packageName.toLowerCase(), 0)

    //  val packageId = packageName

    var deviceId = ""
    var deviceType = ""
    if ("ios".equals(platform)) {
      deviceId = idfa
      deviceType = "idfa"
    } else if ("android".equals(platform)) {
      deviceId = gaid
      deviceType = "gaid"
    }

    val androidId = splitFun(exitId, ",")(5)

    var geoInfo = ""
    var longitude = ""
    var latitude = ""
    var segment = ""
    //  处理jsonMsg,获取geo属性值
    if (jsonMsg.startsWith("{")) {
      val json = GsonUtil.String2JsonObject(jsonMsg)
      val element = json.get("device")
      if (element != null && !element.isJsonNull) {
        val geoElement = element.getAsJsonObject.get("geo")
        if (geoElement != null && !geoElement.isJsonNull) {
          geoInfo = geoElement.toString
          val geoJson = geoElement.getAsJsonObject
          val lonElement = geoJson.get("lon")
          if (lonElement != null && !lonElement.isJsonNull) {
            longitude = lonElement.toString
          }
          val latElement = geoJson.get("lat")
          if (latElement != null && !latElement.isJsonNull) {
            latitude = latElement.toString
          }
        }
      }

      // 获取segment信息
      val userElement = json.get("user")
      if (userElement != null && !userElement.isJsonNull) {
        val dataElement = userElement.getAsJsonObject.get("data")
        if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray
          .foreach(dataEle => {
            val segElement = dataEle.getAsJsonObject.get("segment")
            if (segElement != null && !segElement.isJsonNull && segElement.toString.startsWith("[") && segElement.toString.endsWith("]")) {
              segment = segElement.toString
            }
          })
      }
    }

    DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
      packageName, androidId, time, geoInfo, longitude, latitude, segment)
  }

  def parseData(rows: Iterator[Row]): Iterator[DspReqVO] = {
    val res = new util.ArrayList[DspReqVO]()
    while (rows.hasNext) {
      val row = rows.next
      val idfa = row.getString(IDFA)
      val gaid = row.getString(GAID)
      var packageName = row.getString(PKG_NAME)
      val platform = row.getString(PLATFORM)
      val time = row.getString(UPDATE_TIME)
      val ip = row.getString(IP)
      val maker = row.getString(MAKER)
      val model = row.getString(MODEL)
      val osVersion = row.getString(OS_VERSION)
      val country = row.getString(COUNTRY_CODE)
      val birthday = row.getString(BIRTHDAY)
      val gender = row.getString(GENDER)
      val exitId = row.getString(EXT_ID)
      val jsonMsg = row.getString(JSON_MSG)

      if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id\\\\d+$")) {
        packageName = packageName.replaceAll("id", "")
      }

      var deviceId = ""
      var deviceType = ""
      if ("ios".equals(platform)) {
        deviceId = idfa
        deviceType = "idfa"
      } else if ("android".equals(platform)) {
        deviceId = gaid
        deviceType = "gaid"
      }

      val androidId = splitFun(exitId, ",")(5)

      var geoInfo = ""
      var longitude = ""
      var latitude = ""
      //  var segment = ""
      val segmentMap: mutable.HashMap[String, SegmentVO] = new mutable.HashMap[String, SegmentVO]()
      //处理jsonMsg,获取geo属性值
      if (jsonMsg.startsWith("{")) {
        val json = GsonUtil.String2JsonObject(jsonMsg)
        val element = json.get("device")
        if (element != null && !element.isJsonNull) {
          val geoElement = element.getAsJsonObject.get("geo")
          if (geoElement != null && !geoElement.isJsonNull) {
            geoInfo = geoElement.toString
            val geoJson = geoElement.getAsJsonObject
            val lonElement = geoJson.get("lon")
            if (lonElement != null && !lonElement.isJsonNull) {
              longitude = lonElement.toString
            }
            val latElement = geoJson.get("lat")
            if (latElement != null && !latElement.isJsonNull) {
              latitude = latElement.toString
            }
          }
        }

        // 获取segment信息
        val userElement = json.get("user")
        if (userElement != null && !userElement.isJsonNull) {
          val dataElement = userElement.getAsJsonObject.get("data")
          if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray
            .foreach(dataEle => {
              val segElement = dataEle.getAsJsonObject.get("segment")
              if (segElement != null && !segElement.isJsonNull) {
                val segmentVO = GsonUtil.fromJson(segElement, classOf[SegmentVO])
                //  segment = segElement.toString
                //  segmentMap.put(segmentVO.getId, segmentVO)
              }
            })
        }
      }

      res.add(DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
        "", androidId, time, geoInfo, longitude, latitude, ""))
    }
    import scala.collection.JavaConverters._
    res.asScala.iterator
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("detailOutPath", true, "[must] detailOutPath")
    //  options.addOption("coalesce", true, "[must] coalesce")
    options
  }

  def filterData(row: Row): Boolean = {
    val idfa = row.getString(IDFA)
    val gaid = row.getString(GAID)
    var packageName = row.getString(PKG_NAME)
    val platform = row.getString(PLATFORM)
    val extId = row.getString(EXT_ID)

    if (!"ios".equals(platform) && !"android".equals(platform)) {
      return false
    }

    var deviceId = ""
    if ("ios".equals(platform) && StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !allZero.equals(idfa)) {
      if (packageName.matches("^id\\d+$") || packageName.matches("^\\d+$")) {
        packageName = packageName.replace("id", "")
        if (checkPkgName("ios", packageName) && packageName.length > 8) {
          deviceId = idfa
        }
      }
    } else if ("android".equals(platform) && StringUtils.isNotBlank(gaid) && gaid.matches(didPtn) && !allZero.equals(gaid) && checkPkgName("android", packageName)) {
      deviceId = gaid
    }
    if (StringUtils.isBlank(deviceId)) {
      return false
    }

    if (splitFun(extId, ",").length <= 5) {
      return false
    }
    true
  }

  private def checkPkgName(platform: String, pkg: String) = platform match {
    case "ios" =>
      iosPkgPtn.matcher(pkg).matches || adrPkgPtn.matcher(pkg).matches
    case "android" =>
      adrPkgPtn.matcher(pkg).matches
    case _ =>
      false
  }

  def schema_age_gender: StructType = {
    StructType(StructField("device_id", StringType) ::
      StructField("tag", StringType) ::
      StructField("label", StringType) ::
      StructField("business", StringType) ::
      StructField("device_type", StringType) :: Nil)
  }

  def dspSchema: StructType = {
    StructType(StructField("time", StringType) ::
      StructField("xforwardip", StringType) ::
      StructField("ip", StringType) ::
      StructField("exchanges", StringType) ::
      StructField("elapsed", StringType) ::
      StructField("url", StringType) ::
      StructField("body", StringType) ::
      StructField("requestid", StringType) ::
      StructField("bid", StringType) ::
      StructField("price", StringType) ::
      StructField("describe", StringType) ::
      StructField("ext1", StringType) ::
      StructField("ext2", StringType) ::
      StructField("ext3", StringType) ::
      StructField("ext4", StringType) ::
      StructField("ext5", StringType) ::
      StructField("auctiontype", StringType) ::
      StructField("bidreqid", StringType) ::
      StructField("impid", StringType) ::
      StructField("publisherid", StringType) ::
      StructField("appid", StringType) ::
      StructField("appname", StringType) ::
      StructField("posid", StringType) ::
      StructField("category", StringType) ::
      StructField("intl", StringType) ::
      StructField("imagesize", StringType) ::
      StructField("deviceip", StringType) ::
      StructField("make", StringType) ::
      StructField("model", StringType) ::
      StructField("os", StringType) ::
      StructField("osv", StringType) ::
      StructField("devicetype", StringType) ::
      StructField("cncttype", StringType) ::
      StructField("countrycode", StringType) ::
      StructField("googleadid", StringType) ::
      StructField("imeishal", StringType) ::
      StructField("androididmd5", StringType) ::
      StructField("idfa", StringType) ::
      StructField("keywords", StringType) ::
      StructField("yob", StringType) ::
      StructField("gender", StringType) ::
      StructField("ext6", StringType) ::
      StructField("ext7", StringType) ::
      StructField("ext8", StringType) ::
      StructField("ext9", StringType) ::
      StructField("ext10", StringType) ::
      StructField("campaignid", StringType) ::
      StructField("cinstallprice", StringType) ::
      StructField("cappname", StringType) ::
      StructField("cpackagename", StringType) ::
      StructField("cadvertiserid", StringType) ::
      StructField("ccreativeid", StringType)
      :: Nil)
  }
}

object DspEtlHour {
  def main(args: Array[String]): Unit = {
    new DspEtlHour().run(args)
  }
}