package mobvista.dmp.datasource.dsp

import java.util.regex.Pattern

import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.Row

/**
  * @package: mobvista.dmp.datasource.dsp
  * @author: wangjf
  * @date: 2019-08-15
  * @time: 15:33
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */

class CustomerIterator(iter: Iterator[Row]) extends Iterator[DspReqVO] {
  private val IDFA = 37
  private val GAID = 34
  private val PKG_NAME = 20
  private val PLATFORM = 29
  private val UPDATE_TIME = 0
  private val IP = 26
  private val MAKER = 27
  private val MODEL = 28
  private val OS_VERSION = 30
  private val COUNTRY_CODE = 33
  private val BIRTHDAY = 39
  private val GENDER = 40
  private val EXT_ID = 15
  private val JSON_MSG = 6

  private val DATA_SPLIT = "\t"

  private val iosPkgPtn = Pattern.compile("^\\d+$")
  private val adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$")

  def hasNext: Boolean = {
    iter.hasNext
  }

  def next: DspReqVO = {
    val row = iter.next
    val idfa = row.getString(IDFA)
    val gaid = row.getString(GAID)
    var packageName = row.getString(PKG_NAME)
    val platform = row.getString(PLATFORM)
    val time = row.getString(UPDATE_TIME)
    val ip = row.getString(IP)
    val maker = row.getString(MAKER)
    val model = row.getString(MODEL)
    val osVersion = row.getString(OS_VERSION)
    val country = row.getString(COUNTRY_CODE)
    val birthday = row.getString(BIRTHDAY)
    val gender = row.getString(GENDER)
    val exitId = row.getString(EXT_ID)
    val jsonMsg = row.getString(JSON_MSG)

    if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id\\\\d+$")) {
      packageName = packageName.replaceAll("id", "")
    }

    var deviceId = ""
    var deviceType = ""
    if ("ios".equals(platform)) {
      deviceId = idfa
      deviceType = "idfa"
    } else if ("android".equals(platform)) {
      deviceId = gaid
      deviceType = "gaid"
    }

    val androidId = splitFun(exitId, ",")(5)

    var geoInfo = ""
    var longitude = ""
    var latitude = ""
    var segment = ""
    //处理jsonMsg,获取geo属性值
    if (jsonMsg.startsWith("{")) {
      val json = GsonUtil.String2JsonObject(jsonMsg)
      val element = json.get("device")
      if (element != null && !element.isJsonNull) {
        val geoElement = element.getAsJsonObject.get("geo")
        if (geoElement != null && !geoElement.isJsonNull) {
          geoInfo = geoElement.toString
          val geoJson = geoElement.getAsJsonObject
          val lonElement = geoJson.get("lon")
          if (lonElement != null && !lonElement.isJsonNull) {
            longitude = lonElement.toString
          }
          val latElement = geoJson.get("lat")
          if (latElement != null && !latElement.isJsonNull) {
            latitude = latElement.toString
          }
        }
      }

      import scala.collection.JavaConversions._

      // 获取segment信息
      val userElement = json.get("user")
      if (userElement != null && !userElement.isJsonNull) {
        val dataElement = userElement.getAsJsonObject.get("data")
        if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray
          .foreach(dataEle => {
            val segElement = dataEle.getAsJsonObject.get("segment")
            if (segElement != null && !segElement.isJsonNull) {
              segment = segElement.toString
            }
          })
      }
    }
    DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion,
      "", androidId, time, geoInfo, longitude, latitude, null)
  }

  def splitFun(line: String, split: String): Array[String] = {
    if (split == null) {
      StringUtils.splitPreserveAllTokens(line, DATA_SPLIT, -1)
    } else {
      StringUtils.splitPreserveAllTokens(line, split, -1)
    }
  }
}