package mobvista.dmp.datasource.baichuan import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._ import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import ru.yandex.clickhouse.ClickHouseDataSource /** * @package: mobvista.dmp.datasource.baichuan * @author: wangjf * @date: 2019-08-29 * @time: 20:52 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class BaiChuanJob extends java.io.Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("host", true, "host") options.addOption("cluster", true, "cluster") options.addOption("database", true, "database") options.addOption("table", true, "table") // options.addOption("output", true, "output") options } protected def run(args: Array[String]) { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val cluster = commandLine.getOptionValue("cluster") val host = commandLine.getOptionValue("host") val database = commandLine.getOptionValue("database") val table = commandLine.getOptionValue("table") // val output = commandLine.getOptionValue("output") val spark = SparkSession .builder() .appName("BaiChuanJob") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .enableHiveSupport() .getOrCreate() try { val clusterName = Some(cluster): Option[String] spark.udf.register("hasNo", Constant.hasNo _) val df = spark.sql(Constant.filter_sql.replace("@date", date)) .persist(StorageLevel.MEMORY_AND_DISK_SER) df.createOrReplaceTempView("ods_user_info_daily") val set = Set("387682726", "518966501", "com.taobao.taobao", "com.tmall.wireless") val itr = set.iterator implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host) val update_date = Constant.sdf1.format(Constant.sdf2.parse(date)) val tdf = spark.emptyDataFrame // drop table tdf.dropPartition(database, table, date, clusterName) import spark.implicits._ /** * 分别处理 天猫,淘宝不同OS 设备 */ while (itr.hasNext) { var sql = Constant.etl_sql var sql_imeimd5 = Constant.etl_sql var imeiFlag = false // var output_dir = output // var output_dir_imei = output /** * osId 2 ios, 1 android 3 android_md5 */ val ir = itr.next ir match { case "387682726" => sql = sql.replace("@package", "387682726").replace("@appId", "2") .replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2") // output_dir = output_dir + "/" + "2_2" case "518966501" => sql = sql.replace("@package", "518966501").replace("@appId", "1") .replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2") // output_dir = output_dir + "/" + "1_2" case "com.taobao.taobao" => sql = sql.replace("@package", "com.taobao.taobao").replace("@appId", "2") .replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1") // output_dir = output_dir + "/" + "2_1" sql_imeimd5 = sql_imeimd5.replace("@package", "com.taobao.taobao").replace("@appId", "2") .replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3") imeiFlag = true // output_dir_imei = output_dir_imei + "/" + "2_1_md5" case "com.tmall.wireless" => sql = sql.replace("@package", "com.tmall.wireless").replace("@appId", "1") .replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1") // output_dir = output_dir + "/" + "1_1" sql_imeimd5 = sql_imeimd5.replace("@package", "com.tmall.wireless").replace("@appId", "1") .replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3") imeiFlag = true // output_dir_imei = output_dir_imei + "/" + "1_1_md5" } // 从 install 获取的设备信息 val dff = spark.sql(sql).rdd.map(r => { BaiChuanCK(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString), 0, update_date) }).toDF.dropDuplicates /* dff.write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output_dir) */ if (imeiFlag) { val imeiDff = spark.sql(sql_imeimd5).rdd.map(r => { BaiChuanCK(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString), 0, update_date) }).toDF.dropDuplicates /* imeiDff.write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output_dir_imei) */ imeiDff.saveToClickHouse(database, table, Seq(update_date), Seq("dt"), clusterName, batchSize = 100000) } // dff.createClickHouseDb(database, clusterName) // dff.detachPartition(database, table, "", clusterName) // dff.createClickHouseTable(database, table, Seq("dt"), Constant.indexColumn, Constant.orderColumn, clusterName) dff.saveToClickHouse(database, table, Seq(update_date), Seq("dt"), clusterName, batchSize = 100000) } } finally { if (spark != null) { spark.stop() } } } } object BaiChuanJob { def main(args: Array[String]): Unit = { new BaiChuanJob().run(args) } }