package mobvista.dmp.datasource.rtdmp import java.net.URI import com.datastax.oss.driver.api.core.ConsistencyLevel import com.datastax.spark.connector._ import com.datastax.spark.connector.rdd.ReadConf import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.datasource.rtdmp.Constant.NewAudienceInfo import mobvista.dmp.util.{DateUtil, PropertyUtil} import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/7/13 * @time: 11:25 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class RTDmpMainSpe extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("time", true, "time") options.addOption("input", true, "input") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options.addOption("flag", true, "flag") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val time = commandLine.getOptionValue("time").replace(".", " ") val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val flag = Integer.parseInt(commandLine.getOptionValue("flag")) val system = "rtdmp" val region = "vg" val spark: SparkSession = SparkSession.builder() .appName(s"RTDmpMainSpe.$time") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.$system.${region.toUpperCase}Factory") .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", s"$system.$region.host")) .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "64") .config("spark.cassandra.connection.localConnectionsPerExecutor", "32") .config("spark.cassandra.query.retry.count", "10") .config("spark.cassandra.connection.compression", "LZ4") .config("spark.cassandra.input.consistency.level", "LOCAL_ONE") .config("spark.cassandra.output.consistency.level", "LOCAL_ONE") .config("spark.cassandra.input.fetch.sizeInRows", "2048") .config("spark.cassandra.concurrent.reads", "2048") .config("spark.cassandra.output.concurrent.writes", "16") .config("spark.cassandra.output.batch.grouping.buffer.size", "1024") .config("spark.cassandra.output.batch.size.bytes", "1024") .config("spark.cassandra.connection.keepAliveMS", "60000") .config("spark.cassandra.auth.username", "U&6zBV$*wBuYUpJRq$hp") .config("spark.cassandra.auth.password", "Z8tzjTMBe^M2#hat$nAJ") .getOrCreate() val sc = spark.sparkContext try { import spark.implicits._ if (flag == 1) { val expire_time = DateUtil.getDayByString(time, "yyyy-MM-dd HH", -7) val keyspace = "rtdmp" val tableName = "audience_info" val columns = SomeColumns("devid", "audience_data", "update_time") object ReadConfigurationOne { implicit val readConf = ReadConf(Option(10000), 5, 2048, ConsistencyLevel.LOCAL_ONE, true) } import ReadConfigurationOne._ val cassandraDF = sc.cassandraTable(keyspace, tableName) .mapPartitions(irs => { val res = new ArrayBuffer[(String, String, String)]() irs.foreach(r => { res.add(r.getString("devid"), r.getString("audience_data"), r.getString("update_time")) }) res.iterator }).toDF("devid", "audience_data", "update_time") val selectDF = spark.read.orc(input) // 仅更新上个小时的数据 val update_time_start = DateUtil.format(time + ":00:00", "yyyy-MM-dd HH:mm:ss") val update_time_end = DateUtil.format(time + ":59:59", "yyyy-MM-dd HH:mm:ss") val audience_date_utime_start = DateUtil.parse(time + ":00:00", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800 val audience_date_utime_end = DateUtil.parse(time + ":59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800 val update_ids = ServerUtil.request(update_time_start, update_time_end, audience_date_utime_start, audience_date_utime_end, 0, 0, 2) .asScala.keySet val df = selectDF.join(cassandraDF, Seq("devid"), "leftouter") .select("devid", "audience_ids", "audience_data", "device_type") .rdd .mapPartitions(v => new CustomMapPartition(v, update_time = time, expire_time, update_ids)) df.persist(StorageLevel.MEMORY_AND_DISK_SER) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) df.repartition(coalesce.toInt) .toDF .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) df.saveToCassandra(keyspace, tableName, columns) } else { FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) Seq.empty[NewAudienceInfo].toDF .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object RTDmpMainSpe { def main(args: Array[String]): Unit = { new RTDmpMainSpe().run(args) } }