package mobvista.dmp.datasource.baichuan import java.net.URI import java.text.SimpleDateFormat import mobvista.dmp.util.{DateUtil, PropertyUtil} import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._ import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel import ru.yandex.clickhouse.ClickHouseDataSource /** * @package: mobvista.dmp.datasource.baichuan * @author: wangjf * @date: 2019-08-28 * @time: 17:50 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class AliDaily extends Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("partNum", true, "partNum") options.addOption("output", true, "output") options.addOption("host", true, "host") options.addOption("cluster", true, "cluster") options.addOption("database", true, "database") options.addOption("table", true, "table") options } protected def run(args: Array[String]) { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val partNum = commandLine.getOptionValue("partNum") val output = commandLine.getOptionValue("output") val cluster = commandLine.getOptionValue("cluster") val host = commandLine.getOptionValue("host") val database = commandLine.getOptionValue("database") val table = commandLine.getOptionValue("table") val spark = SparkSession .builder() .appName("BaiChuanDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.clickhouse.driver", "ru.yandex.clickhouse.ClickHouseDriver") .config("spark.clickhouse.url", PropertyUtil.getProperty("config.properties", "spark.clickhouse.url")) .config("spark.clickhouse.connection.per.executor.max", "5") .config("spark.clickhouse.metrics.enable", "true") .config("spark.clickhouse.socket.timeout.ms", "300000") .config("spark.clickhouse.cluster.auto-discovery", "true") .enableHiveSupport() .getOrCreate() try { val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") val sc = spark.sparkContext val clusterName = "cluster_1st" import io.clickhouse.spark.connector._ // import spark.implicits._ val dt = sdf1.format(sdf2.parse(date)) val query = Constant.baichuan_sql.replace("@date", dt) val rdd = sc.clickhouseTable(query, clusterName) .withCustomPartitioning(Constant.buildPart(Integer.parseInt(partNum))) .persist(StorageLevel.MEMORY_AND_DISK_SER) val df = rdd.map(r => { ((r.getAs("device_id").toString, r.getAs("device_type").toString, r.getAs("platform").toString), r.getAs("package_name").toString) }) import spark.implicits._ val dff = df.combineByKey( (v: String) => Iterable(v), (c: Iterable[String], v: String) => c ++ Seq(v), (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2 ).map(r => { BaiChuanEntity(r._1._1, r._1._2, r._1._3, r._2.toSet.mkString(","), "CN") }).toDF val beforeDate = DateUtil.getDay(sdf2.parse(date), "yyyyMMdd", -1) val install_sql = Constant.install_sql.replace("@dt", beforeDate) val install_df = spark.sql(install_sql) val new_rdd = sc.clickhouseTable(Constant.baichuan_new_sql.replace("@date", dt), clusterName) .withCustomPartitioning(Constant.buildPart(Integer.parseInt(partNum))) .map(r => { Row(r.getAs("device_id").toString, r.getAs("device_type").toString) }) val new_df = spark.createDataFrame(new_rdd, Constant.schema) val old_df = spark.createDataFrame(rdd.map(r => { Row(r.getAs("device_id").toString, r.getAs("device_type").toString) }), Constant.schema) val daily_new_df = new_df.except(install_df.unionAll(old_df)).rdd .persist(StorageLevel.MEMORY_AND_DISK_SER) val s3_df = daily_new_df.map(r => { if ("idfa".equals(r.getAs("device_type").toString)) { BaiChuanEntity(r.getAs("device_id").toString, "idfa", "ios", "0000000000", "CN") } else if ("imei".equals(r.getAs("device_type").toString)) { BaiChuanEntity(r.getAs("device_id").toString, "imei", "android", "com.nonetaobao.nonetaobao", "CN") } else { BaiChuanEntity(r.getAs("device_id").toString, "imeimd5", "android", "com.nonetaobao.nonetaobao", "CN") } }).toDF FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) s3_df.unionAll(dff).coalesce(10) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host) val update_date = DateUtil.getDay(sdf2.parse(date), "yyyy-MM-dd", 6) val ck_df = daily_new_df.map(r => { if ("idfa".equals(r.getAs("device_type").toString)) { BaiChuanCK(r.getAs("device_id"), 2, 2, 0, update_date) // BaiChuanInstallEntity(r.getAs("device_id").toString, "idfa", "ios", "0") } else if ("imei".equals(r.getAs("device_type").toString)) { BaiChuanCK(r.getAs("device_id"), 2, 1, 0, update_date) // BaiChuanInstallEntity(r.getAs("device_id").toString, "imei", "android", "0") } else { BaiChuanCK(r.getAs("device_id"), 2, 3, 0, update_date) } }).toDF // ck_df.createClickHouseDb(database, Some(clusterName)) // daily_new_df.createClickHouseTable(database, table, Seq("dt"), Constant.indexColumn, Constant.orderColumn, Some(clusterName)) ck_df.saveToClickHouse(database, table, Seq(update_date), Seq("dt"), Some(clusterName), batchSize = 1000000) } finally { if (spark != null) { spark.stop() } } } } object AliDaily { def main(args: Array[String]): Unit = { new AliDaily().run(args) } }