package mobvista.dmp.datasource.device

import java.util

import mobvista.dmp.datasource.age.mapreduce.ExtractDeviceMould
import mobvista.dmp.util.MRUtils
import org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.codehaus.jackson.`type`.JavaType
import org.codehaus.jackson.map.ObjectMapper

import scala.collection.mutable

/**
  * @package: mobvista.dmp.datasource.device
  * @author: wangjf
  * @create: 2018-08-28 18:43
  **/
object OdsDmpUserInfoLogic {

}

class OdsDmpUserInfoLogic(val date: String) extends Serializable {

  //  解析 gender_sql
  def getGenderSql(): String = {
    Constant.dm_device_gender_sql.replace("@year", date.substring(0, 4))
      .replace("@month", date.substring(5, 7)).replace("@day", date.substring(8, 10)).replace("@update_date", date)
  }

  //  解析 age_sql
  def getAgeSql(): String = {
    Constant.dm_device_age_sql.replace("@year", date.substring(0, 4))
      .replace("@month", date.substring(5, 7)).replace("@day", date.substring(8, 10)).replace("@update_date", date)
  }

  //  解析 install_list_sql
  def getDmInstallListSql(business: String): String = {
    Constant.dm_install_list_sql.replace("@year", date.substring(0, 4))
      .replace("@month", date.substring(5, 7)).replace("@day", date.substring(8, 10)).replace("@business", business)
  }

  //  解析 user_info_sql
  def getUserInfoSql(): String = {
    Constant.user_info_sql.replace("@date", date)
  }

  //  生成最近活跃设备的历史所有 package_names
  def getNewInstallList(rdd: RDD[Row]): RDD[String] = {
    rdd.map(row => InstallList(row.getAs("device_id"), row.getAs("device_type"), row.getAs("install_list")))
      .filter(installList => {
        val device_id = installList.device_id
        val package_name = installList.device_type
        check_deviceId(device_id) && StringUtils.isNotBlank(package_name)
      }).map(installList => {
      val device_id = installList.device_id
      val device_type = installList.device_type
      val package_name = installList.install_list
      val key = device_id
      val value = MRUtils.JOINER.join(package_name, device_type)
      (key, value)
    }).reduceByKey(_ + ";" + _)
      .map(row => {
        val install_list = new mutable.ArrayBuffer[String]()
        val objectMapper: ObjectMapper = new ObjectMapper
        val javaType: JavaType = objectMapper.getTypeFactory.constructCollectionType(classOf[util.ArrayList[_]],
          classOf[ExtractDeviceMould])

        val device_id = row._1
        var device_type = ""
        var new_date = ""
        val package_names: StringBuilder = new StringBuilder()

        val array = row._2.split(";")
        for (iter <- array) {
          val arr = MRUtils.SPLITTER.split(iter, -1)
          val packageList: util.ArrayList[ExtractDeviceMould] = objectMapper.readValue(arr(0), javaType)

          for (i <- 0 until packageList.size) {
            val pkg: ExtractDeviceMould = packageList.get(i)
            if (StringUtils.isNotBlank(pkg.getPackage_name)) {
              if (!StringUtils.isNotBlank(new_date) && StringUtils.isNotBlank(pkg.getDate)) {
                new_date = pkg.getDate
              } else {
                try
                    if (Constant.format.parse(pkg.getDate).after(Constant.format.parse(new_date))) {
                      new_date = pkg.getDate
                    }
                catch {
                  case _: Exception =>
                }
              }
              device_type = arr(1)
              package_names.append("#")
              package_names.append(pkg.getPackage_name)
            }
          }
        }
        if (new_date.equals(date)) {
          install_list += MRUtils.JOINER.join(device_id, device_type, package_names.substring(1))
        }
        install_list
      }).filter(r => {
      r.nonEmpty
    }).map(_.mkString)
  }

  def check_deviceId(device_id: String): Boolean = {
    StringUtils.isNotBlank(device_id) && Constant.lineSplit.split(device_id, -1).length == 5 && !Constant.`match`.matcher(device_id)
      .matches()
  }
}