IQiYiDailyJob.scala 4.77 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
package mobvista.dmp.datasource.iqiyi

import java.net.URI
import java.util.concurrent._

import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, MoreExecutors}
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.iqiyi.Constant.IQiYiEntity
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode}
import org.springframework.scheduling.concurrent.CustomizableThreadFactory

/**
  * @package: mobvista.dmp.datasource.iqiyi
  * @author: wangjf
  * @date: 2020/4/29
  * @time: 11:52 上午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class IQiYiDailyJob extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val input = commandLine.getOptionValue("input")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
    val output = commandLine.getOptionValue("output")

    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"IQiYiDailyJob")
    val sc = spark.sparkContext

    import spark.implicits._
    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
    try {
      /*
      val df = spark.read.orc(input).where("device_type IN ('idfa','imei')")
        .repartition(coalesce)
      */
      val df = spark.read.orc(input).rdd.map(r => {
        Row(r.getAs[String]("device_id").toUpperCase, "idfa", "ios")
      }).repartition(coalesce)

      spark.createDataFrame(df, schema).rdd.mapPartitions(v => {
        getRequestFeature(v)
      }).repartition(coalesce / 10).toDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }

  def schema: StructType = {
    StructType(StructField("device_id", StringType) ::
      StructField("device_type", StringType) ::
      StructField("platform", StringType) ::
      Nil)
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output", true, "[must] output")
    options
  }

  import scala.collection.JavaConverters._

  def getRequestFeature(iterator: Iterator[Row]) = {
    val res = new CopyOnWriteArrayList[IQiYiEntity]()
    val futures = new CopyOnWriteArrayList[ListenableFuture[IQiYiEntity]]()
    val poolExecutor = new ThreadPoolExecutor(1000, 1000, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](5000),
      new CustomizableThreadFactory("IQiYi"), new ThreadPoolExecutor.CallerRunsPolicy())
    val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor)
    val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor)
    MoreExecutors.addDelayedShutdownHook(listeningExecutor, 2, TimeUnit.SECONDS)
    iterator.foreach(row => {
      try {
        val deviceId = row.getAs[String]("device_id")
        val deviceType = row.getAs[String]("device_type")
        val platform = row.getAs[String]("platform")
        val listenableFuture = listeningExecutor.submit(new Callable[IQiYiEntity]() {
          @throws[Exception]
          override def call: IQiYiEntity = {
            val result = DMPServer.request(deviceId)
            if (result.getInteger("status") == 0) {
              val data = result.getJSONObject("data")
              val tags = data.asInstanceOf[java.util.Map[String, Int]].asScala.retain((_, v) => v == 1).keys.mkString(",")
              IQiYiEntity(deviceId, deviceType, platform, tags, "CN")
            } else {
              IQiYiEntity(deviceId, deviceType, platform, "", "CN")
            }
          }
        })
        Futures.addCallback(listenableFuture, new FutureCallback[IQiYiEntity]() {
          override def onSuccess(result: IQiYiEntity): Unit = {
            res.add(result)
          }

          override def onFailure(t: Throwable): Unit = {
            res.add(IQiYiEntity(deviceId, deviceType, platform, "", "CN"))
          }
        }, newPoolExecutor)
        futures.add(listenableFuture)
      } catch {
        case _: Exception => res.add(IQiYiEntity(row.getAs[String]("device_id"), row.getAs[String]("device_type"), row.getAs[String]("platform"), "", "CN"))
      }
    })
    val allAsList = Futures.successfulAsList(futures)
    allAsList.get()
    res.iterator().asScala
  }
}

object IQiYiDailyJob {
  def main(args: Array[String]): Unit = {
    new IQiYiDailyJob().run(args)
  }
}