RTDmpMainSpe.scala 5.99 KB
package mobvista.dmp.datasource.rtdmp

import java.net.URI
import com.datastax.oss.driver.api.core.ConsistencyLevel
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd.ReadConf
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.rtdmp.Constant.NewAudienceInfo
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer


/**
 * @package: mobvista.dmp.datasource.rtdmp
 * @author: wangjf
 * @date: 2020/7/13
 * @time: 11:25 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpMainSpe extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("time", true, "time")
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options.addOption("flag", true, "flag")
    options
  }

  override protected def run(args: Array[String]): Int = {

    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val time = commandLine.getOptionValue("time").replace(".", " ")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")
    val flag = Integer.parseInt(commandLine.getOptionValue("flag"))

    val system = "rtdmp"
    val region = "vg"
    val spark: SparkSession = SparkSession.builder()
      .appName(s"RTDmpMainSpe.$time")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.$system.${region.toUpperCase}Factory")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", s"$system.$region.host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "64")
      .config("spark.cassandra.connection.localConnectionsPerExecutor", "32")
      .config("spark.cassandra.query.retry.count", "10")
      .config("spark.cassandra.connection.compression", "LZ4")
      .config("spark.cassandra.input.consistency.level", "LOCAL_ONE")
      .config("spark.cassandra.output.consistency.level", "LOCAL_ONE")
      .config("spark.cassandra.input.fetch.sizeInRows", "2048")
      .config("spark.cassandra.concurrent.reads", "2048")
      .config("spark.cassandra.output.concurrent.writes", "16")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "1024")
      .config("spark.cassandra.output.batch.size.bytes", "1024")
      .config("spark.cassandra.connection.keepAliveMS", "60000")
      .config("spark.cassandra.auth.username", "U&6zBV$*wBuYUpJRq$hp")
      .config("spark.cassandra.auth.password", "Z8tzjTMBe^M2#hat$nAJ")
      .getOrCreate()

    val sc = spark.sparkContext
    try {
      import spark.implicits._

      if (flag == 1) {
        val expire_time = DateUtil.getDayByString(time, "yyyy-MM-dd HH", -7)

        val keyspace = "rtdmp"
        val tableName = "audience_info"
        val columns = SomeColumns("devid", "audience_data", "update_time")

        object ReadConfigurationOne {
          implicit val readConf = ReadConf(Option(10000), 5, 2048, ConsistencyLevel.LOCAL_ONE, true)
        }
        import ReadConfigurationOne._

        val cassandraDF = sc.cassandraTable(keyspace, tableName)
          .mapPartitions(irs => {
            val res = new ArrayBuffer[(String, String, String)]()
            irs.foreach(r => {
              res.add(r.getString("devid"), r.getString("audience_data"), r.getString("update_time"))
            })
            res.iterator
          }).toDF("devid", "audience_data", "update_time")

        val selectDF = spark.read.orc(input)

        //  仅更新上个小时的数据
        val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss")
        val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss")
        val audience_date_utime_start = DateUtil.parse(time + ":00:00", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800
        val audience_date_utime_end = DateUtil.parse(time + ":59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800

        val update_ids = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0, 2)
          .asScala.keySet

        val df = selectDF.join(cassandraDF, Seq("devid"), "leftouter")
          .select("devid", "audience_ids", "audience_data", "device_type")
          .rdd
          .mapPartitions(v => new CustomMapPartition(v, update_time = time, expire_time, update_ids))

        df.persist(StorageLevel.MEMORY_AND_DISK_SER)

        FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

        df.repartition(coalesce.toInt)
          .toDF
          .write
          .mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(output)

        df.saveToCassandra(keyspace, tableName, columns)
      } else {
        FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

        Seq.empty[NewAudienceInfo].toDF
          .write
          .mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(output)
      }
    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpMainSpe {
  def main(args: Array[String]): Unit = {
    new RTDmpMainSpe().run(args)
  }
}