package mobvista.dmp.clickhouse.tracking import java.text.SimpleDateFormat import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._ import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.SparkSession import ru.yandex.clickhouse.ClickHouseDataSource /** * @package: mobvista.dmp.clickhouse.tracking * @author: wangjf * @date: 2019-09-03 * @time: 10:45 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class TrackingDaily extends Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("host", true, "host") options.addOption("cluster", true, "cluster") options.addOption("database", true, "database") options.addOption("table", true, "table") options } protected def run(args: Array[String]) { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val cluster = commandLine.getOptionValue("cluster") val host = commandLine.getOptionValue("host") val database = commandLine.getOptionValue("database") val table = commandLine.getOptionValue("table") val spark = SparkSession .builder() .appName(s"TrackingDaily.${date}") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") val sc = spark.sparkContext val clusterName = Some(cluster): Option[String] val update_date = sdf1.format(sdf2.parse(date)) val tdf = spark.emptyDataFrame implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host) tdf.dropPartition(database, table, date, clusterName) import spark.implicits._ val rdd = spark.sql(Constant.tracking_daily_sql.replace("@table", table).replace("@date", date)) val df = rdd.rdd.map(r => { TrackingEntity(r.getAs("device_id"), r.getAs("device_model"), r.getAs("os_version"), r.getAs("country"), r.getAs("city"), r.getAs("offer_id"), r.getAs("event_name"), r.getAs("event_type"), r.getAs("log_type")) }).filter(r => { Constant.checkDeviceId(r.device_id) }).toDF df.createClickHouseDb(database, clusterName) df.createClickHouseTable(database, table, Seq("dt"), Constant.indexColumn, Seq(), clusterName) df.saveToClickHouse(database, table, Seq(update_date), Seq("dt"), clusterName, batchSize = 1000000) } finally { if (spark != null) { spark.stop() } } } } object TrackingDaily { def main(args: Array[String]): Unit = { new TrackingDaily().run(args) } }