package mobvista.dmp.datasource.iqiyi

import java.net.URI
import java.util.concurrent._

import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, MoreExecutors}
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.iqiyi.Constant.IQiYiEntity
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode}
import org.springframework.scheduling.concurrent.CustomizableThreadFactory

/**
  * @package: mobvista.dmp.datasource.iqiyi
  * @author: wangjf
  * @date: 2020/4/29
  * @time: 11:52 上午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class IQiYiDailyJob extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val input = commandLine.getOptionValue("input")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
    val output = commandLine.getOptionValue("output")

    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"IQiYiDailyJob")
    val sc = spark.sparkContext

    import spark.implicits._
    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
    try {
      /*
      val df = spark.read.orc(input).where("device_type IN ('idfa','imei')")
        .repartition(coalesce)
      */
      val df = spark.read.orc(input).rdd.map(r => {
        Row(r.getAs[String]("device_id").toUpperCase, "idfa", "ios")
      }).repartition(coalesce)

      spark.createDataFrame(df, schema).rdd.mapPartitions(v => {
        getRequestFeature(v)
      }).repartition(coalesce / 10).toDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }

  def schema: StructType = {
    StructType(StructField("device_id", StringType) ::
      StructField("device_type", StringType) ::
      StructField("platform", StringType) ::
      Nil)
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output", true, "[must] output")
    options
  }

  import scala.collection.JavaConverters._

  def getRequestFeature(iterator: Iterator[Row]) = {
    val res = new CopyOnWriteArrayList[IQiYiEntity]()
    val futures = new CopyOnWriteArrayList[ListenableFuture[IQiYiEntity]]()
    val poolExecutor = new ThreadPoolExecutor(1000, 1000, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](5000),
      new CustomizableThreadFactory("IQiYi"), new ThreadPoolExecutor.CallerRunsPolicy())
    val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor)
    val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor)
    MoreExecutors.addDelayedShutdownHook(listeningExecutor, 2, TimeUnit.SECONDS)
    iterator.foreach(row => {
      try {
        val deviceId = row.getAs[String]("device_id")
        val deviceType = row.getAs[String]("device_type")
        val platform = row.getAs[String]("platform")
        val listenableFuture = listeningExecutor.submit(new Callable[IQiYiEntity]() {
          @throws[Exception]
          override def call: IQiYiEntity = {
            val result = DMPServer.request(deviceId)
            if (result.getInteger("status") == 0) {
              val data = result.getJSONObject("data")
              val tags = data.asInstanceOf[java.util.Map[String, Int]].asScala.retain((_, v) => v == 1).keys.mkString(",")
              IQiYiEntity(deviceId, deviceType, platform, tags, "CN")
            } else {
              IQiYiEntity(deviceId, deviceType, platform, "", "CN")
            }
          }
        })
        Futures.addCallback(listenableFuture, new FutureCallback[IQiYiEntity]() {
          override def onSuccess(result: IQiYiEntity): Unit = {
            res.add(result)
          }

          override def onFailure(t: Throwable): Unit = {
            res.add(IQiYiEntity(deviceId, deviceType, platform, "", "CN"))
          }
        }, newPoolExecutor)
        futures.add(listenableFuture)
      } catch {
        case _: Exception => res.add(IQiYiEntity(row.getAs[String]("device_id"), row.getAs[String]("device_type"), row.getAs[String]("platform"), "", "CN"))
      }
    })
    val allAsList = Futures.successfulAsList(futures)
    allAsList.get()
    res.iterator().asScala
  }
}

object IQiYiDailyJob {
  def main(args: Array[String]): Unit = {
    new IQiYiDailyJob().run(args)
  }
}