package mobvista.dmp.clickhouse.validate import java.text.SimpleDateFormat import mobvista.dmp.clickhouse.realtime.Constant import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._ import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.SparkSession import ru.yandex.clickhouse.ClickHouseDataSource import scala.collection.mutable /** * @package: mobvista.dmp.clickhouse.feature * @author: wangjf * @date: 2019-08-30 * @time: 16:02 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class ValidateUserInfo extends Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("host", true, "host") options.addOption("cluster", true, "cluster") options.addOption("database", true, "database") options.addOption("table", true, "table") options.addOption("input", true, "input") options } protected def run(args: Array[String]) { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val cluster = commandLine.getOptionValue("cluster") val host = commandLine.getOptionValue("host") val database = commandLine.getOptionValue("database") val table = commandLine.getOptionValue("table") val input = commandLine.getOptionValue("input") val spark = SparkSession .builder() .appName("ValidateUserInfo") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { // spark.conf.set("","") val sc = spark.sparkContext val map = sc.broadcast(spark.sql(Constant.id_old2new_sql).rdd.map(r => { (Integer.parseInt(r.getAs("tag_id").toString), r.getAs("tag_code").toString) }).collectAsMap()) val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") // clickhouse params val clusterName = Some(cluster): Option[String] implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host) val update_date = sdf1.format(sdf2.parse(date)) import spark.implicits._ val validateDF = sc.textFile(input).map(j => { val json = GsonUtil.String2JsonObject(j) val device_id = json.get("key").getAsJsonObject.get("idfaGaid").getAsString val value = json.get("val").getAsJsonObject val vals = GsonUtil.fromJson(value, classOf[Val]) val install = vals.installApps val interest = vals.interest val interestSet = new mutable.HashSet[String]() interest.foreach(i => { val int = map.value.getOrElse(i, "") if (StringUtils.isNotBlank(int)) { interestSet.add(int) } }) ValidateEnity(device_id.toUpperCase, install, mutable.WrappedArray.make(interestSet.toArray)) }).toDF val tdf = spark.emptyDataFrame val lastDate = sdf1.format(sdf2.parse(date)) // drop table tdf.dropPartition(database, table, lastDate, clusterName) /** * user_info save */ validateDF.createClickHouseDb(database, clusterName) validateDF.createClickHouseTable(database, table, Seq("dt"), Constant.indexColumn, Constant.orderColumn, clusterName) validateDF.saveToClickHouse(database, table, Seq(update_date), Seq("dt"), clusterName, batchSize = 2000000) } finally { if (spark != null) { spark.stop() } } } } object ValidateUserInfo { def main(args: Array[String]): Unit = { new ValidateUserInfo().run(args) } }