package mobvista.dmp.datasource.rtdmp import com.datastax.spark.connector._ import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.PropertyUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.{Row, SparkSession} /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/7/13 * @time: 11:25 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class DeviceRegionWrite extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("input", true, "input") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val input = commandLine.getOptionValue("input") val spark = SparkSession .builder() .appName("DeviceRegionWrite") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", "aws_host")) .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.connection.compression", "LZ4") .config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM") .config("spark.cassandra.connection.remoteConnectionsPerExecutor","32") .config("spark.cassandra.connection.localConnectionsPerExecutor","64") .config("spark.cassandra.output.concurrent.writes", "512") .config("spark.cassandra.output.batch.grouping.buffer.size", "2048") .config("spark.cassandra.connection.keep_alive_ms", "60000") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext try { /* spark.udf.register("isDeviceMd5", Constant.isDeviceMd5 _) val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") val sql = Constant.read_sql.replace("@dt", date).replace("@update_date", update_date) val df = spark.sql(sql).rdd.map(r => { Row(r.getAs[String]("device_id"), r.getAs[String]("region").split(",", -1).toSet) }).repartition(1000) */ val keyspace = "rtdmp" val tableName = "recent_device_region" val columns = SomeColumns("devid", "dev_type", "region") val df = spark.read.orc(input).rdd.map(r => { Row(r.getAs[String]("device_id"), r.getAs[String]("device_type"), r.getAs[String]("region").split(",", -1).toSet) }) df.saveToCassandra(keyspace, tableName, columns) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object DeviceRegionWrite { def main(args: Array[String]): Unit = { new DeviceRegionWrite().run(args) } }