RTDmpMainSpe.scala 5.99 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
package mobvista.dmp.datasource.rtdmp

import java.net.URI
import com.datastax.oss.driver.api.core.ConsistencyLevel
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd.ReadConf
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.rtdmp.Constant.NewAudienceInfo
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer


/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpMainSpe extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("time", true, "time")
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("flag", true, "flag")
    options
  }

  override protected def run(args: Array[String]): Int = {

    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val time = commandLine.getOptionValue("time").replace(".", " ")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val flag = Integer.parseInt(commandLine.getOptionValue("flag"))

    val system = "rtdmp"
    val region = "vg"
    val spark: SparkSession = SparkSession.builder()
      .appName(s"RTDmpMainSpe.$time")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.$system.${region.toUpperCase}Factory")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", s"$system.$region.host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "64")
      .config("spark.cassandra.connection.localConnectionsPerExecutor", "32")
      .config("spark.cassandra.query.retry.count", "10")
      .config("spark.cassandra.connection.compression", "LZ4")
      .config("spark.cassandra.input.consistency.level", "LOCAL_ONE")
      .config("spark.cassandra.output.consistency.level", "LOCAL_ONE")
      .config("spark.cassandra.input.fetch.sizeInRows", "2048")
      .config("spark.cassandra.concurrent.reads", "2048")
      .config("spark.cassandra.output.concurrent.writes", "16")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "1024")
      .config("spark.cassandra.output.batch.size.bytes", "1024")
      .config("spark.cassandra.connection.keepAliveMS", "60000")
      .config("spark.cassandra.auth.username", "U&6zBV$*wBuYUpJRq$hp")
      .config("spark.cassandra.auth.password", "Z8tzjTMBe^M2#hat$nAJ")
      .getOrCreate()

    val sc = spark.sparkContext
    try {
      import spark.implicits._

      if (flag == 1) {
        val expire_time = DateUtil.getDayByString(time, "yyyy-MM-dd HH", -7)

        val keyspace = "rtdmp"
        val tableName = "audience_info"
        val columns = SomeColumns("devid", "audience_data", "update_time")

        object ReadConfigurationOne {
          implicit val readConf = ReadConf(Option(10000), 5, 2048, ConsistencyLevel.LOCAL_ONE, true)
        }
        import ReadConfigurationOne._

        val cassandraDF = sc.cassandraTable(keyspace, tableName)
          .mapPartitions(irs => {
            val res = new ArrayBuffer[(String, String, String)]()
            irs.foreach(r => {
              res.add(r.getString("devid"), r.getString("audience_data"), r.getString("update_time"))
            })
            res.iterator
          }).toDF("devid", "audience_data", "update_time")

        val selectDF = spark.read.orc(input)

        //  仅更新上个小时的数据
        val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
        val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")
        val audience_date_utime_start = DateUtil.parse(time + ":00:00", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
        val audience_date_utime_end = DateUtil.parse(time + ":59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800

        val update_ids = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0, 2)
          .asScala.keySet

        val df = selectDF.join(cassandraDF, Seq("devid"), "leftouter")
115
          .select("devid", "audience_ids", "audience_data", "device_type")
wang-jinfeng committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
          .rdd
          .mapPartitions(v => new CustomMapPartition(v, update_time = time, expire_time, update_ids))

        df.persist(StorageLevel.MEMORY_AND_DISK_SER)

        FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

        df.repartition(coalesce.toInt)
          .toDF
          .write
          .mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(output)

        df.saveToCassandra(keyspace, tableName, columns)
      } else {
        FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

        Seq.empty[NewAudienceInfo].toDF
          .write
          .mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(output)
      }
    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpMainSpe {
  def main(args: Array[String]): Unit = {
    new RTDmpMainSpe().run(args)
  }
}