package mobvista.dmp.datasource.backflow import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.PropertyUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.cassandra.{DataFrameReaderWrapper, writeTime} import org.apache.spark.sql.{SaveMode, SparkSession} import java.net.URI import scala.collection.mutable /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2021/4/25 * @time: 18:35 下午 * @email: jinfeng.wang@mobvista.com */ class BackFlow extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("keyspace", true, "keyspace") options.addOption("table", true, "table") options.addOption("value_column", true, "value_column") options.addOption("writetime_start", true, "writetime_start") options.addOption("writetime_end", true, "writetime_end") options.addOption("system", true, "system") options.addOption("region", true, "region") options.addOption("output", true, "output") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val keyspace = commandLine.getOptionValue("keyspace") val table = commandLine.getOptionValue("table") val value_column = commandLine.getOptionValue("value_column") val writetime_start = commandLine.getOptionValue("writetime_start") val writetime_end = commandLine.getOptionValue("writetime_end") val region = commandLine.getOptionValue("region") val system = commandLine.getOptionValue("system") val output = commandLine.getOptionValue("output") val ipStr = PropertyUtil.getProperty("ip.properties", s"$system.$region.host_map") val set = new mutable.HashSet[String]() ipStr.split(",").foreach(s => { set.add(s.split(":")(1)) }) val host = set.mkString(",") val spark: SparkSession = SparkSession.builder() .appName(s"BackFlow.$keyspace.$table.$region") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.$system.${region.toUpperCase}Factory") .config("spark.cassandra.connection.host", host) .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "16") .config("spark.cassandra.connection.localConnectionsPerExecutor", "8") .config("spark.cassandra.connection.timeoutMS", 30000) .config("spark.cassandra.query.retry.count", "10") .config("spark.cassandra.connection.compression", "LZ4") .config("spark.cassandra.input.consistency.level", "LOCAL_ONE") .config("spark.cassandra.output.consistency.level", "LOCAL_ONE") .config("spark.cassandra.input.fetch.sizeInRows", "1024") .config("spark.cassandra.concurrent.reads", "1024") .config("spark.cassandra.output.concurrent.writes", "16") .config("spark.cassandra.output.batch.grouping.buffer.size", "1024") .config("spark.cassandra.output.batch.size.bytes", "1024") .config("spark.cassandra.connection.keepAliveMS", "60000") .config("spark.cassandra.auth.username", "chMjFHCD5J7NrXk3gKE8") .config("spark.cassandra.auth.password", "mAQRjmcTMtYRx6iz43FM") .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") .config("spark.sql.catalog.casscatalog", "com.datastax.spark.connector.datasource.CassandraCatalog") .getOrCreate() val sc = spark.sparkContext try { val df = spark.read.cassandraFormat(table, keyspace).load() .withColumn("write_time", writeTime(s"${value_column}")) df.show // 安装 writeTime,过滤需要备份或清洗的记录 val filterDF = df.where(s"write_time < '$writetime_end'") filterDF.show val pathUri = new URI(output) FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration).delete(new Path(output), true) // 备份 S3 filterDF.drop("write_time") .repartition(200) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object BackFlow { def main(args: Array[String]): Unit = { new BackFlow().run(args) } }