package mobvista.dmp.datasource.baichuan

import java.net.URI
import java.util.concurrent._

import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, MoreExecutors}
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.ali.AliServer
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SaveMode}
import org.springframework.scheduling.concurrent.CustomizableThreadFactory

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
  * @package: mobvista.dmp.datasource.baichuan
  * @author: wangjf
  * @date: 2020/4/29
  * @time: 11:52 上午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class AliRequest extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val partition = Integer.parseInt(commandLine.getOptionValue("partition"))
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
    val input = commandLine.getOptionValue("input")
    val appId = Integer.parseInt(commandLine.getOptionValue("appId"))
    val appOs = Integer.parseInt(commandLine.getOptionValue("appOs"))
    val output = commandLine.getOptionValue("output")

    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"AliRequest.$date-${appId}_$appOs")
    val sc = spark.sparkContext

    import spark.implicits._

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(s"$output/${appId}_$appOs"), true)
    try {
      val df = spark.read.orc(s"$input").limit(1000000).repartition(partition)
      val dataFrame = if (appOs == 2) {
        df.rdd.mapPartitions(rows => {
          val deviceSetArray = new ArrayBuffer[mutable.HashSet[String]]()
          var deviceSet = new mutable.HashSet[String]()
          rows.foreach(row => {
            val deviceId = if (appOs == 2) {
              row.getAs[String]("device_id").toUpperCase
            } else {
              row.getAs[String]("device_id")
            }
            if (deviceSet.size < 20) {
              deviceSet.add(deviceId)
            } else {
              deviceSetArray += deviceSet
              deviceSet = new mutable.HashSet[String]()
            }
          })
          deviceSetArray.iterator
        }).mapPartitions(v => {
          getAsoFeature(appId, appOs, v)
        })
      } else {
        df.rdd.mapPartitions(v => {
          getDspFeature(v)
        })
      }
      dataFrame.repartition(coalesce).toDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(s"$output/${appId}_$appOs")
    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("partition", true, "[must] partition")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("appId", true, "[must] appId")
    options.addOption("appOs", true, "[must] appOs")
    options
  }

  def getAsoFeature(appId: Int, appOs: Int, iterator: Iterator[mutable.HashSet[String]]): Iterator[BaiChuanEntity] = {

    val poolExecutor = new ThreadPoolExecutor(200, 300, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](1000),
      new CustomizableThreadFactory("AliRequest"), new ThreadPoolExecutor.CallerRunsPolicy())
    val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor)
    val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor)
    MoreExecutors.addDelayedShutdownHook(poolExecutor, 2, TimeUnit.SECONDS)

    val country = "CN"
    val res = new CopyOnWriteArrayList[BaiChuanEntity]()
    val futures = new CopyOnWriteArrayList[ListenableFuture[ArrayBuffer[BaiChuanEntity]]]()

    iterator.foreach(deviceSet => {
      try {
        var deviceType = ""
        var platform = ""
        var packageName = ""
        appOs match {
          case 1 => //  android
            appId match {
              case 1 => //  天猫
                packageName = "com.tmall.wireless"
              case 2 => //  淘宝
                packageName = "com.taobao.taobao"
            }
            deviceType = "imei"
            platform = "android"
          case 2 => //  ios
            appId match {
              case 1 =>
                packageName = "518966501"
              case 2 =>
                packageName = "387682726"
            }
            deviceType = "idfa"
            platform = "ios"
          case 3 => // imeiMD5 android
            appId match {
              case 1 =>
                packageName = "com.tmall.wireless"
              case 2 =>
                packageName = "com.taobao.taobao"
            }
            deviceType = "imeimd5"
            platform = "android"
        }

        val listenableFuture = listeningExecutor.submit(new Callable[ArrayBuffer[BaiChuanEntity]]() {
          @throws[Exception]
          override def call: ArrayBuffer[BaiChuanEntity] = {
            val baichaunEntityArray = new ArrayBuffer[BaiChuanEntity]()
            val rsp = AliServer.asoRequest(String.valueOf(appId), String.valueOf(appOs), deviceSet.asJava)
            if (rsp != null && rsp.getResult != null && rsp.getResult.getSuccess) {
              for (checkResult <- rsp.getResult.getResults) {
                val deviceId =
                  appOs match {
                    case 1 | 3 =>
                      checkResult.getImei
                    case 2 =>
                      checkResult.getIdfa
                  }
                if (checkResult.getIsNewDevice) {
                  appOs match {
                    case 1 | 3 => //  android
                      appId match {
                        case 1 => //  天猫
                          packageName = "com.nonetmall.nonewireless"
                        case 2 => //  淘宝
                          packageName = "com.nonetaobao.nonetaobao"
                      }
                    case 2 => //  ios
                      appId match {
                        case 1 =>
                          packageName = "0000000000"
                        case 2 =>
                          packageName = "0000000000"
                      }
                  }
                  baichaunEntityArray += BaiChuanEntity(deviceId, deviceType, platform, packageName, country)
                } else {
                  baichaunEntityArray += BaiChuanEntity(deviceId, deviceType, platform, packageName, country)
                }
              }
            } else {
              appOs match {
                case 1 | 3 => //  android
                  appId match {
                    case 1 => //  天猫
                      packageName = "com.nonetmall.nonewireless"
                    case 2 => //  淘宝
                      packageName = "com.nonetaobao.nonetaobao"
                  }
                case 2 => //  ios
                  appId match {
                    case 1 =>
                      packageName = "0000000000"
                    case 2 =>
                      packageName = "0000000000"
                  }
              }
              deviceSet.foreach(deviceId => {
                baichaunEntityArray += BaiChuanEntity(deviceId, deviceType, platform, packageName, country)
              })

            }
            baichaunEntityArray
          }
        })
        Futures.addCallback(listenableFuture, new FutureCallback[ArrayBuffer[BaiChuanEntity]]() {
          override def onSuccess(result: ArrayBuffer[BaiChuanEntity]): Unit = {
            result.foreach(baiChuanEntity => {
              res.add(baiChuanEntity)
            })
          }

          override def onFailure(t: Throwable): Unit = {
            appOs match {
              case 1 | 3 => //  android
                appId match {
                  case 1 => //  天猫
                    packageName = "com.nonetmall.nonewireless"
                  case 2 => //  淘宝
                    packageName = "com.nonetaobao.nonetaobao"
                }
              case 2 => //  ios
                appId match {
                  case 1 =>
                    packageName = "0000000000"
                  case 2 =>
                    packageName = "0000000000"
                }
            }
            deviceSet.foreach(deviceId => {
              res.add(BaiChuanEntity(deviceId, deviceType, platform, packageName, country))
            })
          }
        }, newPoolExecutor)
        futures.add(listenableFuture)
      } catch {
        case _: Exception => {
          processAsoFailure(appId, appOs, deviceSet).foreach(baichuanEntity => {
            res.add(baichuanEntity)
          })
        }
      }
    })
    val allAsList = Futures.successfulAsList(futures)
    allAsList.get()
    res.iterator().asScala
  }

  def getDspFeature(iterator: Iterator[Row]): Iterator[BaiChuanEntity] = {

    val poolExecutor = new ThreadPoolExecutor(200, 500, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](1000),
      new CustomizableThreadFactory("AliRequest"), new ThreadPoolExecutor.CallerRunsPolicy())
    val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor)
    val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor)
    MoreExecutors.addDelayedShutdownHook(poolExecutor, 2, TimeUnit.SECONDS)

    val country = "CN"
    val res = new CopyOnWriteArrayList[BaiChuanEntity]()
    val futures = new CopyOnWriteArrayList[ListenableFuture[BaiChuanEntity]]()
    iterator.foreach(row => {
      try {
        var deviceId = row.getAs[String]("device_id")
        val appId = row.getAs[Int]("app_id")
        val appOs = row.getAs[Int]("app_os")

        var deviceType = ""
        var platform = ""
        var packageName = ""

        appOs match {
          case 1 => //  android
            appId match {
              case 1 => //  天猫
                packageName = "com.tmall.wireless"
              case 2 => //  淘宝
                packageName = "com.taobao.taobao"
            }
            deviceType = "imei"
            platform = "android"
          case 2 => //  ios
            appId match {
              case 1 =>
                packageName = "518966501"
              case 2 =>
                packageName = "387682726"
            }
            deviceId = deviceId.toUpperCase
            deviceType = "idfa"
            platform = "ios"
          case 3 => // imeiMD5 android
            appId match {
              case 1 =>
                packageName = "com.tmall.wireless"
              case 2 =>
                packageName = "com.taobao.taobao"
            }
            deviceType = "imeimd5"
            platform = "android"
        }

        val listenableFuture = listeningExecutor.submit(new Callable[BaiChuanEntity]() {
          @throws[Exception]
          override def call: BaiChuanEntity = {
            val result = AliServer.dspRequest(deviceId, appOs)
            if (!result.getBoolean("result")) {
              BaiChuanEntity(deviceId, deviceType, platform, packageName, country)
            } else {
              appOs match {
                case 1 | 3 => //  android
                  appId match {
                    case 1 => //  天猫
                      packageName = "com.nonetmall.nonewireless"
                    case 2 => //  淘宝
                      packageName = "com.nonetaobao.nonetaobao"
                  }
                case 2 => //  ios
                  appId match {
                    case 1 =>
                      packageName = "0000000000"
                    case 2 =>
                      packageName = "0000000000"
                  }
              }
              BaiChuanEntity(deviceId, deviceType, platform, packageName, country)
            }
          }
        })
        Futures.addCallback(listenableFuture, new FutureCallback[BaiChuanEntity]() {
          override def onSuccess(result: BaiChuanEntity): Unit = {
            res.add(result)
          }

          override def onFailure(t: Throwable): Unit = {
            appOs match {
              case 1 | 3 => //  android
                appId match {
                  case 1 => //  天猫
                    packageName = "com.nonetmall.nonewireless"
                  case 2 => //  淘宝
                    packageName = "com.nonetaobao.nonetaobao"
                }
              case 2 => //  ios
                appId match {
                  case 1 =>
                    packageName = "0000000000"
                  case 2 =>
                    packageName = "0000000000"
                }
            }
            res.add(BaiChuanEntity(deviceId, deviceType, platform, packageName, country))
          }
        }, newPoolExecutor)
        futures.add(listenableFuture)
      } catch {
        case _: Exception => {
          res.add(processDspFailure(row))
        }
      }
    })
    val allAsList = Futures.successfulAsList(futures)
    allAsList.get()
    res.iterator().asScala
  }

  def processDspFailure(row: Row): BaiChuanEntity = {
    var deviceId = row.getAs[String]("device_id")
    val appId = row.getAs[Int]("app_id")
    val appOs = row.getAs[Int]("app_os")
    var deviceType = ""
    var platform = ""
    var packageName = ""
    appOs match {
      case 1 => //  android
        appId match {
          case 1 => //  天猫
            packageName = "com.nonetmall.nonewireless"
          case 2 => //  淘宝
            packageName = "com.nonetaobao.nonetaobao"
        }
        deviceType = "imei"
        platform = "android"
      case 2 => //  ios
        appId match {
          case 1 =>
            packageName = "0000000000"
          case 2 =>
            packageName = "0000000000"
        }
        deviceId = deviceId.toUpperCase
        deviceType = "idfa"
        platform = "ios"
      case 3 => // imeiMD5 android
        appId match {
          case 1 => //  天猫
            packageName = "com.nonetmall.nonewireless"
          case 2 => //  淘宝
            packageName = "com.nonetaobao.nonetaobao"
        }
        deviceType = "imeimd5"
        platform = "android"
    }
    BaiChuanEntity(deviceId, deviceType, platform, packageName, "CN")
  }

  def processAsoFailure(appId: Int, appOs: Int, deviceSet: mutable.HashSet[String]): ArrayBuffer[BaiChuanEntity] = {
    val baichaunEntityArray = new ArrayBuffer[BaiChuanEntity]()

    var deviceType = ""
    var platform = ""
    var packageName = ""
    appOs match {
      case 1 => //  android
        appId match {
          case 1 => //  天猫
            packageName = "com.nonetmall.nonewireless"
          case 2 => //  淘宝
            packageName = "com.nonetaobao.nonetaobao"
        }
        deviceType = "imei"
        platform = "android"
      case 2 => //  ios
        appId match {
          case 1 =>
            packageName = "0000000000"
          case 2 =>
            packageName = "0000000000"
        }
        deviceType = "idfa"
        platform = "ios"
      case 3 => // imeiMD5 android
        appId match {
          case 1 => //  天猫
            packageName = "com.nonetmall.nonewireless"
          case 2 => //  淘宝
            packageName = "com.nonetaobao.nonetaobao"
        }
        deviceType = "imeimd5"
        platform = "android"
    }
    deviceSet.foreach(deviceId => {
      baichaunEntityArray.add(BaiChuanEntity(deviceId, deviceType, platform, packageName, "CN"))
    })
    baichaunEntityArray
  }
}

object AliRequest {
  def main(args: Array[String]): Unit = {
    new AliRequest().run(args)
  }
}