package mobvista.dmp.datasource.rtdmp import java.net.URI import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.{DateUtil, PropertyUtil} import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel import scala.collection.JavaConversions._ /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/7/13 * @time: 11:25 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class RTDmpMainDeal extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("time", true, "time") options.addOption("input", true, "input") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options.addOption("partition", true, "partition") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val time = commandLine.getOptionValue("time").replace(".", " ") val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val partition = commandLine.getOptionValue("partition") val spark: SparkSession = SparkSession.builder() .appName(s"RTDmpMainDeal.$time") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", "aws_host")) .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.query.retry.count", "10") .config("spark.cassandra.connection.compression", "LZ4") .config("spark.cassandra.input.consistency.level", "LOCAL_ONE") .config("spark.cassandra.output.consistency.level", "LOCAL_ONE") .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "15") .config("spark.cassandra.connection.localConnectionsPerExecutor", "12") .config("spark.cassandra.input.fetch.sizeInRows", "4096") .config("spark.cassandra.concurrent.reads", "4096") .config("spark.cassandra.output.concurrent.writes", "32") .config("spark.cassandra.output.batch.grouping.buffer.size", "2048") .config("spark.cassandra.output.batch.size.bytes", "2048") .config("spark.cassandra.connection.keepAliveMS", "60000") .getOrCreate() val sc = spark.sparkContext try { val expire_time = DateUtil.getDayByString(time, "yyyy-MM-dd HH", -7) val keyspace = "rtdmp" val tableName = "audience_info" val columns = SomeColumns("devid", "audience_data", "update_time") val cassandraConnector = CassandraConnector(sc.getConf) // 默认计算上个小时的数据 val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss") val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss") val audience_date_utime_start = 1577811600L val audience_date_utime_end = 4100731200L // val update_time_start = "2000-01-01 00:00:00" // val update_time_end = "2099-12-31 23:59:59" // val audience_date_utime_start = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 28800 // val audience_date_utime_end = DateUtil.parse(data_utime, "yyyy-MM-dd HH").getTime / 1000 - 25200 val set = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0, 2) .retain((_, v) => v._2 == 1) .keySet println("audienceIds -->> " + set.mkString(",")) import spark.implicits._ val df = spark.read.orc(input) .repartition(partition.toInt) .mapPartitions(it => cassandraConnector.withSessionDo(session => { Logic.getResultFeature(session, it) })) .mapPartitions(new CustomIteratorAudienceInfoV2(_, time, expire_time, set)) .rdd df.persist(StorageLevel.MEMORY_AND_DISK_SER) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) // df.mapPartitions(Logic.writeResultV2(cassandraConnector, _)) df.repartition(coalesce.toInt) .toDF .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) // update audience_info df.saveToCassandra(keyspace, tableName, columns) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object RTDmpMainDeal { def main(args: Array[String]): Unit = { new RTDmpMainDeal().run(args) } }