package mobvista.dmp.datasource.iqiyi

import java.util
import java.util.concurrent.{Callable, CopyOnWriteArrayList, ExecutionException, ExecutorService, Executors, Future, LinkedBlockingDeque, LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit}

import{JSONArray, JSONObject}
import{FutureCallback, Futures, ListenableFuture, ListeningExecutorService, MoreExecutors, ThreadFactoryBuilder}
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.SparkEnv
import org.springframework.scheduling.concurrent.CustomizableThreadFactory

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

class EtlIQiYiActivitionDaily extends CommonSparkJob with Serializable{
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("outputdaily", true, "[must] outputdaily")
    options.addOption("today", true, "[must] today")
    options.addOption("last_req_day", true, "[must] last_req_day")

  //  def buildRes(iterator: Iterator[Row]): Iterator[ArrayBuffer[DeviceIdStatus]] = {
  //    println("executorId=="+SparkEnv.get.executorId)
  //    println("threadname=="+Thread.currentThread().getName())
  //    var ip: String = null
  //    var host: String = null
  //    val ia: InetAddress = InetAddress.getLocalHost
  //    host = ia.getHostName //获取计算机名字
  //    ip = ia.getHostAddress //获取IP
  //    println("host=="+host)
  //    println("ip=="+ip)
  //    // val poolExecutor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](500),
  //    //   new CustomizableThreadFactory("AliRequest"), new ThreadPoolExecutor.CallerRunsPolicy())
  //    // val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor)
  //    // val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor)
  //    // MoreExecutors.addDelayedShutdownHook(poolExecutor, 2, TimeUnit.SECONDS)
  //    val threadFactory: ThreadFactory = new ThreadFactoryBuilder().setNameFormat("EtlIQiYiActivitionDaily-async-pool-%d").build
  //    val threadPoolExecutor = new ThreadPoolExecutor(2, 2, 2, TimeUnit.MINUTES, new LinkedBlockingQueue[Runnable](), threadFactory)
  //    val listeningExecutor: ListeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor)
  //    val newPoolExecutor: ExecutorService = MoreExecutors.getExitingExecutorService(threadPoolExecutor)
  //    MoreExecutors.addDelayedShutdownHook(threadPoolExecutor, 2, TimeUnit.SECONDS)
  //    val res = new CopyOnWriteArrayList[ArrayBuffer[DeviceIdStatus]]()
  //    val futures = new CopyOnWriteArrayList[ListenableFuture[ArrayBuffer[DeviceIdStatus]]]()
  //    var tmpresult = ArrayBuffer[String]()
  //    while(iterator.hasNext) {
  //      if (tmpresult.length < 50) {
  //        val device_id_md5=[String]("device_id_md5")
  //        tmpresult +=device_id_md5
  //      }
  //      if (tmpresult.length == 50) {
  //        val JoinDeviceId: String = tmpresult.mkString(",")
  //        val listenableFuture: ListenableFuture[ArrayBuffer[DeviceIdStatus]] = listeningExecutor.submit(new Callable[ArrayBuffer[DeviceIdStatus]]() {
  //          override def call(): ArrayBuffer[DeviceIdStatus] = {
  //            var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]()
  //            val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
  //            //    println("22jsonObject22="+jsonObject)
  //            val data: JSONArray = jsonObject.getJSONArray("data")
  //            import scala.collection.JavaConversions._
  //            for (element <- data) {
  //              val element1: JSONObject = element.asInstanceOf[JSONObject]
  //              val deviceId: String = element1.getString("id")
  //              val status: String = element1.getString("status")
  //              tmpres += DeviceIdStatus(deviceId, status)
  //            }
  //            //            println("res3333==="+tmpres)
  //            tmpres
  //          }
  //        })
  //        Futures.addCallback(listenableFuture, new FutureCallback[ArrayBuffer[DeviceIdStatus]] {
  //          override def onSuccess(result: ArrayBuffer[DeviceIdStatus]): Unit = {
  //            //    println("res4444==="+res)
  //            res.add(result)
  //            println("success=")
  //          }
  //          override def onFailure(t: Throwable): Unit = {
  //            println("t.getMessage==="+t.getMessage)
  //            println("fail=")}
  //        }, newPoolExecutor)
  //        futures.add(listenableFuture)
  //        tmpresult.clear()
  //      }
  //      if (!iterator.hasNext) {
  //        if (tmpresult.nonEmpty) {
  //          println("tmpresult.length=="+tmpresult.length)
  //          val JoinDeviceId: String = tmpresult.mkString(",")
  //          val listenableFuture: ListenableFuture[ArrayBuffer[DeviceIdStatus]] = listeningExecutor.submit(new Callable[ArrayBuffer[DeviceIdStatus]]() {
  //            override def call(): ArrayBuffer[DeviceIdStatus] = {
  //              var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]()
  //              val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
  //              val data: JSONArray = jsonObject.getJSONArray("data")
  //              import scala.collection.JavaConversions._
  //              for (element <- data) {
  //                val element1: JSONObject = element.asInstanceOf[JSONObject]
  //                val deviceId: String = element1.getString("id")
  //                val status: String = element1.getString("status")
  //                tmpres += DeviceIdStatus(deviceId, status)
  //              }
  //              tmpres
  //            }
  //          })
  //          Futures.addCallback(listenableFuture, new FutureCallback[ArrayBuffer[DeviceIdStatus]] {
  //            override def onSuccess(result: ArrayBuffer[DeviceIdStatus]): Unit = {
  //              println("onSuccess=")
  //              res.add(result)}
  //            override def onFailure(t: Throwable): Unit = {
  //              //          println("t.getMessage###"+t.getMessage)
  //              println("onFailure=")}
  //          }, newPoolExecutor)
  //          futures.add(listenableFuture)
  //          tmpresult.clear()
  //        }
  //      }
  //    }
  //    val allAsList = Futures.successfulAsList(futures)
  //    allAsList.get()
  //    //    println("res5555==="+res)
  //    res.iterator().asScala
  //  }

  //  def buildRes(iterator: Iterator[Row]): Iterator[ArrayBuffer[DeviceIdStatus]] = {
  //    println("executorId=="+SparkEnv.get.executorId)
  //    println("threadname=="+Thread.currentThread().getName())
  //    var ip: String = null
  //    var host: String = null
  //    val ia: InetAddress = InetAddress.getLocalHost
  //    host = ia.getHostName //获取计算机名字
  //    ip = ia.getHostAddress //获取IP
  //    println("host=="+host)
  //    println("ip=="+ip)
  //    val tasks: util.List[Callable[ArrayBuffer[DeviceIdStatus]]] = new util.ArrayList[Callable[ArrayBuffer[DeviceIdStatus]]]
  //    var tmpresult = ArrayBuffer[String]()
  //    while(iterator.hasNext) {
  //      if (tmpresult.length < 50) {
  //        val device_id_md5=[String]("device_id_md5")
  //        tmpresult +=device_id_md5
  //      }
  //      if (tmpresult.length == 50) {
  //        val JoinDeviceId: String = tmpresult.mkString(",")
  //        val callable: Callable[ArrayBuffer[DeviceIdStatus]] = new Callable[ArrayBuffer[DeviceIdStatus]]() {
  //          @throws[Exception]
  //          override def call: ArrayBuffer[DeviceIdStatus] = {
  //            var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]()
  //            val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
  //            val data: JSONArray = jsonObject.getJSONArray("data")
  //            import scala.collection.JavaConversions._
  //            for (element <- data) {
  //              val element1: JSONObject = element.asInstanceOf[JSONObject]
  //              val deviceId: String = element1.getString("id")
  //              val status: String = element1.getString("status")
  //              tmpres += DeviceIdStatus(deviceId, status)
  //            }
  //            tmpres
  //          }
  //        }
  //        tasks.add(callable)
  //        tmpresult.clear()
  //      }
  //      if (!iterator.hasNext) {
  //        if (tmpresult.nonEmpty) {
  //          println("tmpresult.length=="+tmpresult.length)
  //          val JoinDeviceId: String = tmpresult.mkString(",")
  //          val callable: Callable[ArrayBuffer[DeviceIdStatus]] = new Callable[ArrayBuffer[DeviceIdStatus]]() {
  //            @throws[Exception]
  //            override def call: ArrayBuffer[DeviceIdStatus] = {
  //              var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]()
  //              val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
  //              val data: JSONArray = jsonObject.getJSONArray("data")
  //              import scala.collection.JavaConversions._
  //              for (element <- data) {
  //                val element1: JSONObject = element.asInstanceOf[JSONObject]
  //                val deviceId: String = element1.getString("id")
  //                val status: String = element1.getString("status")
  //                tmpres += DeviceIdStatus(deviceId, status)
  //              }
  //              tmpres
  //            }
  //          }
  //          tasks.add(callable)
  //          tmpresult.clear()
  //        }
  //      }
  //    }
  //    val executorService: ExecutorService = Executors.newFixedThreadPool(5)
  //    var futures: util.List[Future[ArrayBuffer[DeviceIdStatus]]] = null
  //    try futures = executorService.invokeAll(tasks)
  //    catch {
  //      case e: InterruptedException =>
  //        e.printStackTrace()
  //    }
  //    val res = new CopyOnWriteArrayList[ArrayBuffer[DeviceIdStatus]]()
  //    import scala.collection.JavaConversions._
  //    for (future <- futures) {
  //      try {val value: ArrayBuffer[DeviceIdStatus] = future.get
  //        res.add(value)}
  //      catch {
  //        case e: InterruptedException =>
  //          e.printStackTrace()
  //        case e: ExecutionException =>
  //          e.printStackTrace()
  //      }
  //    }
  //    executorService.shutdown
  //    //    println("res5555==="+res)
  //    res.iterator().asScala
  //  }

  def buildRes(iterator: Iterator[Row]): Iterator[String] = {
    var ip: String = null
    var host: String = null
    val ia: InetAddress = InetAddress.getLocalHost
    host = ia.getHostName //获取计算机名字
    ip = ia.getHostAddress //获取IP

    val tasks: util.List[Callable[String]] = new util.ArrayList[Callable[String]]
    var tmpresult = ArrayBuffer[String]()
    while(iterator.hasNext) {
      if (tmpresult.length < 50) {
        val device_id_md5=[String]("device_id_md5")
        tmpresult +=device_id_md5
      if (tmpresult.length == 50) {
        val JoinDeviceId: String = tmpresult.mkString(",")
        val callable: Callable[String] = new Callable[String]() {
          override def call: String = {
            val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
      if (!iterator.hasNext) {
        if (tmpresult.nonEmpty) {
          val JoinDeviceId: String = tmpresult.mkString(",")
          val callable: Callable[String] = new Callable[String]() {
            override def call: String = {
              val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
    val executorService: ExecutorService = Executors.newFixedThreadPool(2)
    var futures: util.List[Future[String]] = null

    try futures = executorService.invokeAll(tasks)
    catch {
      case e: InterruptedException =>

    val res = new CopyOnWriteArrayList[String]()
    import scala.collection.JavaConversions._
    for (future <- futures) {
      try {val value: String = future.get
      catch {
        case e: InterruptedException =>
        case e: ExecutionException =>

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      return -1
    } else printOptions(commandLine)

    val outputdaily = commandLine.getOptionValue("outputdaily")
    val today = commandLine.getOptionValue("today")
    val last_req_day = commandLine.getOptionValue("last_req_day")

    val spark = SparkSession.builder()
      .config("spark.rdd.compress", "true")
      .config("", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    import spark.implicits._
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputdaily), true)

    try {

      var sql1=
           |select XX.device_id_md5,XX.device_id,XX.device_type
           |FROM (select X.device_id_md5,X.device_id,X.device_type,
           |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk
           |from ( select device_id,device_type,
           |case when device_type = 'imei' then MD5(device_id) when device_type = 'imeimd5' then device_id end as device_id_md5
           |from dwh.ods_dmp_user_info where dt ='${today}'
           | and device_type in ('imei','imeimd5')
           | and last_req_day >='${last_req_day}'
           | and  upper(country) = 'CN'  ) X ) XX
           | WHERE XX.rk= 1   limit 10000000

      //      spark.sql(sql1)
      import spark.implicits._
      //      spark.sql(sql1).repartition(30).rdd.mapPartitions(rows => { buildRes(rows)}).flatMap(row =>row)

      spark.sql(sql1).repartition(30).rdd.mapPartitions(rows => { buildRes(rows)}).

    } finally {

object EtlIQiYiActivitionDaily {
  def main(args: Array[String]): Unit = {
    new EtlIQiYiActivitionDaily().run(args)