package mobvista.dmp.datasource.rtdmp import com.alibaba.fastjson.JSONObject import com.datastax.spark.connector._ import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.util.{DateUtil, PropertyUtil} import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.{Row, SparkSession} import scala.collection.JavaConversions._ /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/7/13 * @time: 11:25 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class AudienceInfoWrite extends CommonSparkJob with Serializable { val slf1 = "yyyyMMddHH" val slf2 = "yyyy-MM-dd HH" def commandOptions(): Options = { val options = new Options() options.addOption("datetime", true, "datetime") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val datetime = commandLine.getOptionValue("datetime") val spark = SparkSession .builder() .appName("AudienceInfoWrite") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", "aws_host")) .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.connection.remoteConnectionsPerExecutor", "15") .config("spark.cassandra.connection.localConnectionsPerExecutor", "12") .config("spark.cassandra.query.retry.count", "10") .config("spark.cassandra.connection.compression", "LZ4") .config("spark.cassandra.input.consistency.level", "LOCAL_ONE") .config("spark.cassandra.output.consistency.level", "LOCAL_ONE") .config("spark.cassandra.input.fetch.sizeInRows", "2048") .config("spark.cassandra.concurrent.reads", "2048") .config("spark.cassandra.output.concurrent.writes", "16") .config("spark.cassandra.output.batch.grouping.buffer.size", "1024") .config("spark.cassandra.output.batch.size.bytes", "1024") .config("spark.cassandra.connection.keepAliveMS", "60000") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext try { val keyspace = "rtdmp" val tableName = "audience_info" val columns = SomeColumns("devid", "audience_data", "update_time") val sql = s""" |SELECT devid, audience_map, update_time FROM dwh.audience_merge WHERE dt = '$datetime' |""".stripMargin val df = spark.sql(sql).rdd.map(r => { val devid = r.getAs[String]("devid") val audienceMap = r.getAs[String]("audience_map") val audienceData = new JSONObject() MobvistaConstant.String2JSONObject(audienceMap).asInstanceOf[java.util.Map[String, String]].foreach(kv => { audienceData.put(kv._1, DateUtil.format(DateUtil.parse(kv._2, slf1), slf2)) }) val update_time = DateUtil.format(DateUtil.parse(r.getAs[String]("update_time"), slf1), slf2) Row(devid, audienceData.toJSONString, update_time) }) df.saveToCassandra(keyspace, tableName, columns) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object AudienceInfoWrite { def main(args: Array[String]): Unit = { new AudienceInfoWrite().run(args) } }