RTDmpMainDeal.scala 5 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
package mobvista.dmp.datasource.rtdmp

import java.net.URI

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConversions._

/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpMainDeal extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("time", true, "time")
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("partition", true, "partition")
    options
  }

  override protected def run(args: Array[String]): Int = {

    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val time = commandLine.getOptionValue("time").replace(".", " ")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val partition = commandLine.getOptionValue("partition")

    val spark: SparkSession = SparkSession.builder()
      .appName(s"RTDmpMainDeal.$time")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", "aws_host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.query.retry.count", "10")
      .config("spark.cassandra.connection.compression", "LZ4")
      .config("spark.cassandra.input.consistency.level", "LOCAL_ONE")
      .config("spark.cassandra.output.consistency.level", "LOCAL_ONE")
      .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "15")
      .config("spark.cassandra.connection.localConnectionsPerExecutor", "12")
      .config("spark.cassandra.input.fetch.sizeInRows", "4096")
      .config("spark.cassandra.concurrent.reads", "4096")
      .config("spark.cassandra.output.concurrent.writes", "32")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "2048")
      .config("spark.cassandra.output.batch.size.bytes", "2048")
      .config("spark.cassandra.connection.keepAliveMS", "60000")
      .getOrCreate()
    val sc = spark.sparkContext

    try {

      val expire_time = DateUtil.getDayByString(time, "yyyy-MM-dd HH", -7)

      val keyspace = "rtdmp"
      val tableName = "audience_info"
      val columns = SomeColumns("devid", "audience_data", "update_time")
      val cassandraConnector = CassandraConnector(sc.getConf)

      //  默认计算上个小时的数据
      val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
      val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")
      val audience_date_utime_start = 1577811600L
      val audience_date_utime_end = 4100731200L

      //  val update_time_start = "2000-01-01 00:00:00"
      //  val update_time_end = "2099-12-31 23:59:59"
      //  val audience_date_utime_start = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 28800
      //  val audience_date_utime_end = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 25200

      val set = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0, 2)
        .retain((_, v) => v._2 == 1)
        .keySet

      println("audienceIds -->> " + set.mkString(","))

      import spark.implicits._

      val df = spark.read.orc(input)
        .repartition(partition.toInt)
        .mapPartitions(it => cassandraConnector.withSessionDo(session => {
          Logic.getResultFeature(session, it)
        }))
        .mapPartitions(new CustomIteratorAudienceInfoV2(_, time, expire_time, set))
        .rdd

      df.persist(StorageLevel.MEMORY_AND_DISK_SER)

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      //  df.mapPartitions(Logic.writeResultV2(cassandraConnector, _))

      df.repartition(coalesce.toInt)
        .toDF
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

      //  update audience_info
      df.saveToCassandra(keyspace, tableName, columns)

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpMainDeal {
  def main(args: Array[String]): Unit = {
    new RTDmpMainDeal().run(args)
  }
}