package mobvista.dmp.datasource.iqiyi import java.net.URI import java.util import java.util.concurrent.{Callable, CopyOnWriteArrayList, ExecutionException, ExecutorService, Executors, Future, LinkedBlockingDeque, LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit} import com.alibaba.fastjson.{JSONArray, JSONObject} import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, ListeningExecutorService, MoreExecutors, ThreadFactoryBuilder} import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.SparkEnv import org.springframework.scheduling.concurrent.CustomizableThreadFactory import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import java.net.InetAddress class EtlIQiYiActivitionDaily extends CommonSparkJob with Serializable{ override protected def buildOptions(): Options = { val options = new Options options.addOption("outputdaily", true, "[must] outputdaily") options.addOption("today", true, "[must] today") options.addOption("last_req_day", true, "[must] last_req_day") options } // def buildRes(iterator: Iterator[Row]): Iterator[ArrayBuffer[DeviceIdStatus]] = { // println("executorId=="+SparkEnv.get.executorId) // println("threadname=="+Thread.currentThread().getName()) // var ip: String = null // var host: String = null // val ia: InetAddress = InetAddress.getLocalHost // host = ia.getHostName //获取计算机名字 // ip = ia.getHostAddress //获取IP // println("host=="+host) // println("ip=="+ip) // // // val poolExecutor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](500), // // new CustomizableThreadFactory("AliRequest"), new ThreadPoolExecutor.CallerRunsPolicy()) // // val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor) // // val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor) // // MoreExecutors.addDelayedShutdownHook(poolExecutor, 2, TimeUnit.SECONDS) // // val threadFactory: ThreadFactory = new ThreadFactoryBuilder().setNameFormat("EtlIQiYiActivitionDaily-async-pool-%d").build // val threadPoolExecutor = new ThreadPoolExecutor(2, 2, 2, TimeUnit.MINUTES, new LinkedBlockingQueue[Runnable](), threadFactory) // val listeningExecutor: ListeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor) // val newPoolExecutor: ExecutorService = MoreExecutors.getExitingExecutorService(threadPoolExecutor) // MoreExecutors.addDelayedShutdownHook(threadPoolExecutor, 2, TimeUnit.SECONDS) // // // val res = new CopyOnWriteArrayList[ArrayBuffer[DeviceIdStatus]]() // val futures = new CopyOnWriteArrayList[ListenableFuture[ArrayBuffer[DeviceIdStatus]]]() // var tmpresult = ArrayBuffer[String]() // while(iterator.hasNext) { // if (tmpresult.length < 50) { // val device_id_md5= iterator.next().getAs[String]("device_id_md5") // tmpresult +=device_id_md5 // } // if (tmpresult.length == 50) { // val JoinDeviceId: String = tmpresult.mkString(",") // val listenableFuture: ListenableFuture[ArrayBuffer[DeviceIdStatus]] = listeningExecutor.submit(new Callable[ArrayBuffer[DeviceIdStatus]]() { // override def call(): ArrayBuffer[DeviceIdStatus] = { // var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]() // val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1") // // println("22jsonObject22="+jsonObject) // val data: JSONArray = jsonObject.getJSONArray("data") // import scala.collection.JavaConversions._ // for (element <- data) { // val element1: JSONObject = element.asInstanceOf[JSONObject] // val deviceId: String = element1.getString("id") // val status: String = element1.getString("status") // tmpres += DeviceIdStatus(deviceId, status) // } // // println("res3333==="+tmpres) // tmpres // } // }) // // Futures.addCallback(listenableFuture, new FutureCallback[ArrayBuffer[DeviceIdStatus]] { // override def onSuccess(result: ArrayBuffer[DeviceIdStatus]): Unit = { // // println("res4444==="+res) // res.add(result) // println("success=") // } // override def onFailure(t: Throwable): Unit = { // println("t.getMessage==="+t.getMessage) // println("fail=")} // }, newPoolExecutor) // futures.add(listenableFuture) // tmpresult.clear() // } // if (!iterator.hasNext) { // if (tmpresult.nonEmpty) { // println("tmpresult.length=="+tmpresult.length) // val JoinDeviceId: String = tmpresult.mkString(",") // val listenableFuture: ListenableFuture[ArrayBuffer[DeviceIdStatus]] = listeningExecutor.submit(new Callable[ArrayBuffer[DeviceIdStatus]]() { // override def call(): ArrayBuffer[DeviceIdStatus] = { // var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]() // val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1") // val data: JSONArray = jsonObject.getJSONArray("data") // import scala.collection.JavaConversions._ // for (element <- data) { // val element1: JSONObject = element.asInstanceOf[JSONObject] // val deviceId: String = element1.getString("id") // val status: String = element1.getString("status") // tmpres += DeviceIdStatus(deviceId, status) // } // tmpres // } // }) // // Futures.addCallback(listenableFuture, new FutureCallback[ArrayBuffer[DeviceIdStatus]] { // override def onSuccess(result: ArrayBuffer[DeviceIdStatus]): Unit = { // println("onSuccess=") // res.add(result)} // // override def onFailure(t: Throwable): Unit = { // // println("t.getMessage###"+t.getMessage) // println("onFailure=")} // }, newPoolExecutor) // futures.add(listenableFuture) // tmpresult.clear() // } // } // } // val allAsList = Futures.successfulAsList(futures) // allAsList.get() // // println("res5555==="+res) // res.iterator().asScala // } // def buildRes(iterator: Iterator[Row]): Iterator[ArrayBuffer[DeviceIdStatus]] = { // println("executorId=="+SparkEnv.get.executorId) // println("threadname=="+Thread.currentThread().getName()) // var ip: String = null // var host: String = null // val ia: InetAddress = InetAddress.getLocalHost // host = ia.getHostName //获取计算机名字 // ip = ia.getHostAddress //获取IP // println("host=="+host) // println("ip=="+ip) // // val tasks: util.List[Callable[ArrayBuffer[DeviceIdStatus]]] = new util.ArrayList[Callable[ArrayBuffer[DeviceIdStatus]]] // var tmpresult = ArrayBuffer[String]() // while(iterator.hasNext) { // if (tmpresult.length < 50) { // val device_id_md5= iterator.next().getAs[String]("device_id_md5") // tmpresult +=device_id_md5 // } // if (tmpresult.length == 50) { // val JoinDeviceId: String = tmpresult.mkString(",") // val callable: Callable[ArrayBuffer[DeviceIdStatus]] = new Callable[ArrayBuffer[DeviceIdStatus]]() { // @throws[Exception] // override def call: ArrayBuffer[DeviceIdStatus] = { // var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]() // val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1") // val data: JSONArray = jsonObject.getJSONArray("data") // import scala.collection.JavaConversions._ // for (element <- data) { // val element1: JSONObject = element.asInstanceOf[JSONObject] // val deviceId: String = element1.getString("id") // val status: String = element1.getString("status") // tmpres += DeviceIdStatus(deviceId, status) // } // tmpres // } // } // tasks.add(callable) // tmpresult.clear() // } // if (!iterator.hasNext) { // if (tmpresult.nonEmpty) { // println("tmpresult.length=="+tmpresult.length) // val JoinDeviceId: String = tmpresult.mkString(",") // val callable: Callable[ArrayBuffer[DeviceIdStatus]] = new Callable[ArrayBuffer[DeviceIdStatus]]() { // @throws[Exception] // override def call: ArrayBuffer[DeviceIdStatus] = { // var tmpres: ArrayBuffer[DeviceIdStatus] = ArrayBuffer[DeviceIdStatus]() // val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1") // val data: JSONArray = jsonObject.getJSONArray("data") // import scala.collection.JavaConversions._ // for (element <- data) { // val element1: JSONObject = element.asInstanceOf[JSONObject] // val deviceId: String = element1.getString("id") // val status: String = element1.getString("status") // tmpres += DeviceIdStatus(deviceId, status) // } // tmpres // } // } // tasks.add(callable) // tmpresult.clear() // } // } // } // val executorService: ExecutorService = Executors.newFixedThreadPool(5) // var futures: util.List[Future[ArrayBuffer[DeviceIdStatus]]] = null // // try futures = executorService.invokeAll(tasks) // catch { // case e: InterruptedException => // e.printStackTrace() // } // // val res = new CopyOnWriteArrayList[ArrayBuffer[DeviceIdStatus]]() // import scala.collection.JavaConversions._ // for (future <- futures) { // try {val value: ArrayBuffer[DeviceIdStatus] = future.get // res.add(value)} // catch { // case e: InterruptedException => // e.printStackTrace() // case e: ExecutionException => // e.printStackTrace() // } // } // executorService.shutdown // // println("res5555==="+res) // res.iterator().asScala // } def buildRes(iterator: Iterator[Row]): Iterator[String] = { println("executorId=="+SparkEnv.get.executorId) println("threadname=="+Thread.currentThread().getName()) var ip: String = null var host: String = null val ia: InetAddress = InetAddress.getLocalHost host = ia.getHostName //获取计算机名字 ip = ia.getHostAddress //获取IP println("host=="+host) println("ip=="+ip) val tasks: util.List[Callable[String]] = new util.ArrayList[Callable[String]] var tmpresult = ArrayBuffer[String]() while(iterator.hasNext) { if (tmpresult.length < 50) { val device_id_md5= iterator.next().getAs[String]("device_id_md5") tmpresult +=device_id_md5 } if (tmpresult.length == 50) { val JoinDeviceId: String = tmpresult.mkString(",") val callable: Callable[String] = new Callable[String]() { @throws[Exception] override def call: String = { val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1") jsonObject.toString } } tasks.add(callable) tmpresult.clear() } if (!iterator.hasNext) { if (tmpresult.nonEmpty) { println("tmpresult.length=="+tmpresult.length) val JoinDeviceId: String = tmpresult.mkString(",") val callable: Callable[String] = new Callable[String]() { @throws[Exception] override def call: String = { val jsonObject: JSONObject = IQiYiLaHuo.requestHttpQps(JoinDeviceId,"1","1") jsonObject.toString } } tasks.add(callable) tmpresult.clear() } } } val executorService: ExecutorService = Executors.newFixedThreadPool(2) var futures: util.List[Future[String]] = null try futures = executorService.invokeAll(tasks) catch { case e: InterruptedException => e.printStackTrace() } val res = new CopyOnWriteArrayList[String]() import scala.collection.JavaConversions._ for (future <- futures) { try {val value: String = future.get res.add(value)} catch { case e: InterruptedException => e.printStackTrace() case e: ExecutionException => println("fail===") e.printStackTrace() } } executorService.shutdown res.iterator().asScala } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val outputdaily = commandLine.getOptionValue("outputdaily") val today = commandLine.getOptionValue("today") val last_req_day = commandLine.getOptionValue("last_req_day") val spark = SparkSession.builder() .appName("EtlAliActivitionDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputdaily), true) try { var sql1= s""" |select XX.device_id_md5,XX.device_id,XX.device_type |FROM (select X.device_id_md5,X.device_id,X.device_type, |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk |from ( select device_id,device_type, |case when device_type = 'imei' then MD5(device_id) when device_type = 'imeimd5' then device_id end as device_id_md5 |from dwh.ods_dmp_user_info where dt ='${today}' | and device_type in ('imei','imeimd5') | and last_req_day >='${last_req_day}' | and upper(country) = 'CN' ) X ) XX | WHERE XX.rk= 1 limit 10000000 """.stripMargin // spark.sql(sql1).rdd.map(_.mkString).coalesce(40).saveAsTextFile(outputdaily) import spark.implicits._ // spark.sql(sql1).repartition(30).rdd.mapPartitions(rows => { buildRes(rows)}).flatMap(row =>row) // .toDF.rdd.map(_.mkString("\t")).coalesce(30).saveAsTextFile(outputdaily) spark.sql(sql1).repartition(30).rdd.mapPartitions(rows => { buildRes(rows)}). coalesce(30).saveAsTextFile(outputdaily) } finally { spark.stop() } 0 } } object EtlIQiYiActivitionDaily { def main(args: Array[String]): Unit = { new EtlIQiYiActivitionDaily().run(args) } }