package mobvista.dmp.datasource.backflow import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.PropertyUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.cassandra.DataFrameWriterWrapper import scala.collection.mutable /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2021/4/25 * @time: 18:35 下午 * @email: jinfeng.wang@mobvista.com */ class BackWrite extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("keyspace", true, "keyspace") options.addOption("table", true, "table") options.addOption("system", true, "system") options.addOption("region", true, "region") options.addOption("input", true, "input") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val keyspace = commandLine.getOptionValue("keyspace") val table = commandLine.getOptionValue("table") val region = commandLine.getOptionValue("region") val system = commandLine.getOptionValue("system") val input = commandLine.getOptionValue("input") val ipStr = PropertyUtil.getProperty("ip.properties", s"$system.$region.host_map") val set = new mutable.HashSet[String]() ipStr.split(",").foreach(s => { set.add(s.split(":")(1)) }) val host = set.mkString(",") val spark: SparkSession = SparkSession.builder() .appName(s"BackWrite.$keyspace.$table.$region") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.$system.${region.toUpperCase}Factory") .config("spark.cassandra.connection.host", host) .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "16") .config("spark.cassandra.connection.localConnectionsPerExecutor", "8") .config("spark.cassandra.connection.timeoutMS", 30000) .config("spark.cassandra.query.retry.count", "10") .config("spark.cassandra.connection.compression", "LZ4") .config("spark.cassandra.input.consistency.level", "LOCAL_ONE") .config("spark.cassandra.output.consistency.level", "LOCAL_ONE") .config("spark.cassandra.input.fetch.sizeInRows", "1024") .config("spark.cassandra.concurrent.reads", "1024") .config("spark.cassandra.output.concurrent.writes", "16") .config("spark.cassandra.output.batch.grouping.buffer.size", "1024") .config("spark.cassandra.output.batch.size.bytes", "1024") .config("spark.cassandra.connection.keepAliveMS", "60000") .config("spark.cassandra.auth.username", "chMjFHCD5J7NrXk3gKE8") .config("spark.cassandra.auth.password", "mAQRjmcTMtYRx6iz43FM") .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") .config("spark.sql.catalog.casscatalog", "com.datastax.spark.connector.datasource.CassandraCatalog") .getOrCreate() val sc = spark.sparkContext try { val df = spark.read.orc(input) df.write .cassandraFormat(table, keyspace) .withTTL(10) .mode("APPEND") .save() } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object BackWrite { def main(args: Array[String]): Unit = { new BackWrite().run(args) } }