package mobvista.dmp.datasource.iqiyi import java.net.URI import java.util.concurrent._ import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, MoreExecutors} import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.datasource.iqiyi.Constant.IQiYiEntity import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode} import org.springframework.scheduling.concurrent.CustomizableThreadFactory /** * @package: mobvista.dmp.datasource.iqiyi * @author: wangjf * @date: 2020/4/29 * @time: 11:52 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class IQiYiDailyJob extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else { printOptions(commandLine) } val input = commandLine.getOptionValue("input") val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val output = commandLine.getOptionValue("output") val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"IQiYiDailyJob") val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) try { /* val df = spark.read.orc(input).where("device_type IN ('idfa','imei')") .repartition(coalesce) */ val df = spark.read.orc(input).rdd.map(r => { Row(r.getAs[String]("device_id").toUpperCase, "idfa", "ios") }).repartition(coalesce) spark.createDataFrame(df, schema).rdd.mapPartitions(v => { getRequestFeature(v) }).repartition(coalesce / 10).toDF .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { sc.stop() spark.stop() } 0 } def schema: StructType = { StructType(StructField("device_id", StringType) :: StructField("device_type", StringType) :: StructField("platform", StringType) :: Nil) } override protected def buildOptions(): Options = { val options = new Options options.addOption("input", true, "[must] input") options.addOption("coalesce", true, "[must] coalesce") options.addOption("output", true, "[must] output") options } import scala.collection.JavaConverters._ def getRequestFeature(iterator: Iterator[Row]) = { val res = new CopyOnWriteArrayList[IQiYiEntity]() val futures = new CopyOnWriteArrayList[ListenableFuture[IQiYiEntity]]() val poolExecutor = new ThreadPoolExecutor(1000, 1000, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](5000), new CustomizableThreadFactory("IQiYi"), new ThreadPoolExecutor.CallerRunsPolicy()) val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor) val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor) MoreExecutors.addDelayedShutdownHook(listeningExecutor, 2, TimeUnit.SECONDS) iterator.foreach(row => { try { val deviceId = row.getAs[String]("device_id") val deviceType = row.getAs[String]("device_type") val platform = row.getAs[String]("platform") val listenableFuture = listeningExecutor.submit(new Callable[IQiYiEntity]() { @throws[Exception] override def call: IQiYiEntity = { val result = DMPServer.request(deviceId) if (result.getInteger("status") == 0) { val data = result.getJSONObject("data") val tags = data.asInstanceOf[java.util.Map[String, Int]].asScala.retain((_, v) => v == 1).keys.mkString(",") IQiYiEntity(deviceId, deviceType, platform, tags, "CN") } else { IQiYiEntity(deviceId, deviceType, platform, "", "CN") } } }) Futures.addCallback(listenableFuture, new FutureCallback[IQiYiEntity]() { override def onSuccess(result: IQiYiEntity): Unit = { res.add(result) } override def onFailure(t: Throwable): Unit = { res.add(IQiYiEntity(deviceId, deviceType, platform, "", "CN")) } }, newPoolExecutor) futures.add(listenableFuture) } catch { case _: Exception => res.add(IQiYiEntity(row.getAs[String]("device_id"), row.getAs[String]("device_type"), row.getAs[String]("platform"), "", "CN")) } }) val allAsList = Futures.successfulAsList(futures) allAsList.get() res.iterator().asScala } } object IQiYiDailyJob { def main(args: Array[String]): Unit = { new IQiYiDailyJob().run(args) } }