package mobvista.dmp.datasource.iqiyi

import java.net.URI
import java.util
import java.util.concurrent.{Callable, CopyOnWriteArrayList, ExecutionException, ExecutorService, Executors, Future, LinkedBlockingDeque, LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit}

import com.alibaba.fastjson.{JSONArray, JSONObject}
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, ListeningExecutorService, MoreExecutors, ThreadFactoryBuilder}
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.SparkEnv
import org.springframework.scheduling.concurrent.CustomizableThreadFactory

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import java.net.InetAddress

class EtlIQiYiActivitionDaily extends CommonSparkJob with Serializable{
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("outputdaily", true, "[must] outputdaily")
    options.addOption("today", true, "[must] today")
    options.addOption("last_req_day", true, "[must] last_req_day")
    options
  }



  //  def buildRes(iterator: Iterator[Row]): Iterator[ArrayBuffer[DeviceIdStatus]] = {
  //    println("executorId=="+SparkEnv.get.executorId)
  //    println("threadname=="+Thread.currentThread().getName())
  //    var ip: String = null
  //    var host: String = null
  //    val ia: InetAddress = InetAddress.getLocalHost
  //    host = ia.getHostName //获取计算机名字
  //    ip = ia.getHostAddress //获取IP
  //    println("host=="+host)
  //    println("ip=="+ip)
  //
  //    // val poolExecutor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](500),
  //    //   new CustomizableThreadFactory("AliRequest"), new ThreadPoolExecutor.CallerRunsPolicy())
  //    // val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor)
  //    // val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor)
  //    // MoreExecutors.addDelayedShutdownHook(poolExecutor, 2, TimeUnit.SECONDS)
  //
  //    val threadFactory: ThreadFactory = new ThreadFactoryBuilder().setNameFormat("EtlIQiYiActivitionDaily-async-pool-%d").build
  //    val threadPoolExecutor = new ThreadPoolExecutor(2, 2, 2, TimeUnit.MINUTES, new LinkedBlockingQueue[Runnable](), threadFactory)
  //    val listeningExecutor: ListeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor)
  //    val newPoolExecutor: ExecutorService = MoreExecutors.getExitingExecutorService(threadPoolExecutor)
  //    MoreExecutors.addDelayedShutdownHook(threadPoolExecutor, 2, TimeUnit.SECONDS)
  //
  //
  //    val res = new CopyOnWriteArrayList[ArrayBuffer[DeviceIdStatus]]()
  //    val futures = new CopyOnWriteArrayList[ListenableFuture[ArrayBuffer[DeviceIdStatus]]]()
  //    var tmpresult = ArrayBuffer[String]()
  //    while(iterator.hasNext) {
  //      if (tmpresult.length < 50) {
  //        val device_id_md5= iterator.next().getAs[String]("device_id_md5")
  //        tmpresult +=device_id_md5
  //      }
  //      if (tmpresult.length == 50) {
  //        val JoinDeviceId: String = tmpresult.mkString(",")
  //        val listenableFuture: ListenableFuture[ArrayBuffer[DeviceIdStatus]] = listeningExecutor.submit(new Callable[ArrayBuffer[DeviceIdStatus]]() {
  //          override def call(): ArrayBuffer[DeviceIdStatus] = {
  //            var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]()
  //            val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
  //            //    println("22jsonObject22="+jsonObject)
  //            val data: JSONArray = jsonObject.getJSONArray("data")
  //            import scala.collection.JavaConversions._
  //            for (element <- data) {
  //              val element1: JSONObject = element.asInstanceOf[JSONObject]
  //              val deviceId: String = element1.getString("id")
  //              val status: String = element1.getString("status")
  //              tmpres += DeviceIdStatus(deviceId, status)
  //            }
  //            //            println("res3333==="+tmpres)
  //            tmpres
  //          }
  //        })
  //
  //        Futures.addCallback(listenableFuture, new FutureCallback[ArrayBuffer[DeviceIdStatus]] {
  //          override def onSuccess(result: ArrayBuffer[DeviceIdStatus]): Unit = {
  //            //    println("res4444==="+res)
  //            res.add(result)
  //            println("success=")
  //          }
  //          override def onFailure(t: Throwable): Unit = {
  //            println("t.getMessage==="+t.getMessage)
  //            println("fail=")}
  //        }, newPoolExecutor)
  //        futures.add(listenableFuture)
  //        tmpresult.clear()
  //      }
  //      if (!iterator.hasNext) {
  //        if (tmpresult.nonEmpty) {
  //          println("tmpresult.length=="+tmpresult.length)
  //          val JoinDeviceId: String = tmpresult.mkString(",")
  //          val listenableFuture: ListenableFuture[ArrayBuffer[DeviceIdStatus]] = listeningExecutor.submit(new Callable[ArrayBuffer[DeviceIdStatus]]() {
  //            override def call(): ArrayBuffer[DeviceIdStatus] = {
  //              var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]()
  //              val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
  //              val data: JSONArray = jsonObject.getJSONArray("data")
  //              import scala.collection.JavaConversions._
  //              for (element <- data) {
  //                val element1: JSONObject = element.asInstanceOf[JSONObject]
  //                val deviceId: String = element1.getString("id")
  //                val status: String = element1.getString("status")
  //                tmpres += DeviceIdStatus(deviceId, status)
  //              }
  //              tmpres
  //            }
  //          })
  //
  //          Futures.addCallback(listenableFuture, new FutureCallback[ArrayBuffer[DeviceIdStatus]] {
  //            override def onSuccess(result: ArrayBuffer[DeviceIdStatus]): Unit = {
  //              println("onSuccess=")
  //              res.add(result)}
  //
  //            override def onFailure(t: Throwable): Unit = {
  //              //          println("t.getMessage###"+t.getMessage)
  //              println("onFailure=")}
  //          }, newPoolExecutor)
  //          futures.add(listenableFuture)
  //          tmpresult.clear()
  //        }
  //      }
  //    }
  //    val allAsList = Futures.successfulAsList(futures)
  //    allAsList.get()
  //    //    println("res5555==="+res)
  //    res.iterator().asScala
  //  }

  //  def buildRes(iterator: Iterator[Row]): Iterator[ArrayBuffer[DeviceIdStatus]] = {
  //    println("executorId=="+SparkEnv.get.executorId)
  //    println("threadname=="+Thread.currentThread().getName())
  //    var ip: String = null
  //    var host: String = null
  //    val ia: InetAddress = InetAddress.getLocalHost
  //    host = ia.getHostName //获取计算机名字
  //    ip = ia.getHostAddress //获取IP
  //    println("host=="+host)
  //    println("ip=="+ip)
  //
  //    val tasks: util.List[Callable[ArrayBuffer[DeviceIdStatus]]] = new util.ArrayList[Callable[ArrayBuffer[DeviceIdStatus]]]
  //    var tmpresult = ArrayBuffer[String]()
  //    while(iterator.hasNext) {
  //      if (tmpresult.length < 50) {
  //        val device_id_md5= iterator.next().getAs[String]("device_id_md5")
  //        tmpresult +=device_id_md5
  //      }
  //      if (tmpresult.length == 50) {
  //        val JoinDeviceId: String = tmpresult.mkString(",")
  //        val callable: Callable[ArrayBuffer[DeviceIdStatus]] = new Callable[ArrayBuffer[DeviceIdStatus]]() {
  //          @throws[Exception]
  //          override def call: ArrayBuffer[DeviceIdStatus] = {
  //            var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]()
  //            val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
  //            val data: JSONArray = jsonObject.getJSONArray("data")
  //            import scala.collection.JavaConversions._
  //            for (element <- data) {
  //              val element1: JSONObject = element.asInstanceOf[JSONObject]
  //              val deviceId: String = element1.getString("id")
  //              val status: String = element1.getString("status")
  //              tmpres += DeviceIdStatus(deviceId, status)
  //            }
  //            tmpres
  //          }
  //        }
  //        tasks.add(callable)
  //        tmpresult.clear()
  //      }
  //      if (!iterator.hasNext) {
  //        if (tmpresult.nonEmpty) {
  //          println("tmpresult.length=="+tmpresult.length)
  //          val JoinDeviceId: String = tmpresult.mkString(",")
  //          val callable: Callable[ArrayBuffer[DeviceIdStatus]] = new Callable[ArrayBuffer[DeviceIdStatus]]() {
  //            @throws[Exception]
  //            override def call: ArrayBuffer[DeviceIdStatus] = {
  //              var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]()
  //              val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
  //              val data: JSONArray = jsonObject.getJSONArray("data")
  //              import scala.collection.JavaConversions._
  //              for (element <- data) {
  //                val element1: JSONObject = element.asInstanceOf[JSONObject]
  //                val deviceId: String = element1.getString("id")
  //                val status: String = element1.getString("status")
  //                tmpres += DeviceIdStatus(deviceId, status)
  //              }
  //              tmpres
  //            }
  //          }
  //          tasks.add(callable)
  //          tmpresult.clear()
  //        }
  //      }
  //    }
  //    val executorService: ExecutorService = Executors.newFixedThreadPool(5)
  //    var futures: util.List[Future[ArrayBuffer[DeviceIdStatus]]] = null
  //
  //    try futures = executorService.invokeAll(tasks)
  //    catch {
  //      case e: InterruptedException =>
  //        e.printStackTrace()
  //    }
  //
  //    val res = new CopyOnWriteArrayList[ArrayBuffer[DeviceIdStatus]]()
  //    import scala.collection.JavaConversions._
  //    for (future <- futures) {
  //      try {val value: ArrayBuffer[DeviceIdStatus] = future.get
  //        res.add(value)}
  //      catch {
  //        case e: InterruptedException =>
  //          e.printStackTrace()
  //        case e: ExecutionException =>
  //          e.printStackTrace()
  //      }
  //    }
  //    executorService.shutdown
  //    //    println("res5555==="+res)
  //    res.iterator().asScala
  //  }

  def buildRes(iterator: Iterator[Row]): Iterator[String] = {
    println("executorId=="+SparkEnv.get.executorId)
    println("threadname=="+Thread.currentThread().getName())
    var ip: String = null
    var host: String = null
    val ia: InetAddress = InetAddress.getLocalHost
    host = ia.getHostName //获取计算机名字
    ip = ia.getHostAddress //获取IP
    println("host=="+host)
    println("ip=="+ip)

    val tasks: util.List[Callable[String]] = new util.ArrayList[Callable[String]]
    var tmpresult = ArrayBuffer[String]()
    while(iterator.hasNext) {
      if (tmpresult.length < 50) {
        val device_id_md5= iterator.next().getAs[String]("device_id_md5")
        tmpresult +=device_id_md5
      }
      if (tmpresult.length == 50) {
        val JoinDeviceId: String = tmpresult.mkString(",")
        val callable: Callable[String] = new Callable[String]() {
          @throws[Exception]
          override def call: String = {
            val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
            jsonObject.toString
          }
        }
        tasks.add(callable)
        tmpresult.clear()
      }
      if (!iterator.hasNext) {
        if (tmpresult.nonEmpty) {
          println("tmpresult.length=="+tmpresult.length)
          val JoinDeviceId: String = tmpresult.mkString(",")
          val callable: Callable[String] = new Callable[String]() {
            @throws[Exception]
            override def call: String = {
              val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1")
              jsonObject.toString
            }
          }
          tasks.add(callable)
          tmpresult.clear()
        }
      }
    }
    val executorService: ExecutorService = Executors.newFixedThreadPool(2)
    var futures: util.List[Future[String]] = null

    try futures = executorService.invokeAll(tasks)
    catch {
      case e: InterruptedException =>
        e.printStackTrace()
    }

    val res = new CopyOnWriteArrayList[String]()
    import scala.collection.JavaConversions._
    for (future <- futures) {
      try {val value: String = future.get
        res.add(value)}
      catch {
        case e: InterruptedException =>
          e.printStackTrace()
        case e: ExecutionException =>
          println("fail===")
          e.printStackTrace()
      }
    }
    executorService.shutdown
    res.iterator().asScala
  }


  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val outputdaily = commandLine.getOptionValue("outputdaily")
    val today = commandLine.getOptionValue("today")
    val last_req_day = commandLine.getOptionValue("last_req_day")


    val spark = SparkSession.builder()
      .appName("EtlAliActivitionDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputdaily), true)


    try {

      var sql1=
        s"""
           |select XX.device_id_md5,XX.device_id,XX.device_type
           |FROM (select X.device_id_md5,X.device_id,X.device_type,
           |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk
           |from ( select device_id,device_type,
           |case when device_type = 'imei' then MD5(device_id) when device_type = 'imeimd5' then device_id end as device_id_md5
           |from dwh.ods_dmp_user_info where dt ='${today}'
           | and device_type in ('imei','imeimd5')
           | and last_req_day >='${last_req_day}'
           | and  upper(country) = 'CN'  ) X ) XX
           | WHERE XX.rk= 1   limit 10000000
        """.stripMargin

      //      spark.sql(sql1).rdd.map(_.mkString).coalesce(40).saveAsTextFile(outputdaily)
      import spark.implicits._
      //      spark.sql(sql1).repartition(30).rdd.mapPartitions(rows => { buildRes(rows)}).flatMap(row =>row)
      //        .toDF.rdd.map(_.mkString("\t")).coalesce(30).saveAsTextFile(outputdaily)

      spark.sql(sql1).repartition(30).rdd.mapPartitions(rows => { buildRes(rows)}).
        coalesce(30).saveAsTextFile(outputdaily)


    } finally {
      spark.stop()
    }
    0
  }
}

object EtlIQiYiActivitionDaily {
  def main(args: Array[String]): Unit = {
    new EtlIQiYiActivitionDaily().run(args)
  }
}