package mobvista.dmp.clickhouse.realtime import java.text.SimpleDateFormat import java.util.Date import mobvista.dmp.datasource.age_gender.Logic import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._ import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} import ru.yandex.clickhouse.ClickHouseDataSource import scala.collection.mutable /** * @package: mobvista.dmp.clickhouse.realtime * @author: wangjf * @date: 2019-10-17 * @time: 10:20 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class MergeEtlHourToCK extends Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("host", true, "host") options.addOption("cluster", true, "cluster") options.addOption("database", true, "database") options.addOption("table", true, "table") options.addOption("input_dsp", true, "input_dsp") options.addOption("input_adn", true, "input_adn") options.addOption("region", true, "region") options.addOption("hour", true, "hour") options.addOption("app_tag_input", true, "app_tag_input") options } val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") val sdf3 = new SimpleDateFormat("yyyy/MM/dd") protected def run(args: Array[String]) { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val cluster = commandLine.getOptionValue("cluster") val host = commandLine.getOptionValue("host") val database = commandLine.getOptionValue("database") val table = commandLine.getOptionValue("table") val input_dsp = commandLine.getOptionValue("input_dsp") val input_adn = commandLine.getOptionValue("input_adn") val region = commandLine.getOptionValue("region") val hour = commandLine.getOptionValue("hour") val app_tag_input = commandLine.getOptionValue("app_tag_input") val spark = SparkSession .builder() .appName(s"MergeEtlHourToCK.${region}") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { import spark.implicits._ val sc = spark.sparkContext val now = date.substring(0, 4).toInt val bMap = sc.broadcast(spark.read.orc(app_tag_input).rdd.map(r => { (Integer.parseInt(r.getAs("package_id").toString), r.getAs("interest").asInstanceOf[mutable.WrappedArray[String]].toArray[String]) }).collectAsMap()) var package_sql = """ |SHOW PARTITIONS dwh.package_mapping """.stripMargin var partDF = spark.sql(package_sql) val package_dt = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1) package_sql = s""" |SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}' """.stripMargin val packageMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => { (r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString)) }).collectAsMap()) val clusterName = Some(cluster): Option[String] implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host) val partDate = sdf2.format(sdf3.parse(date)) val dt = sdf1.format(sdf3.parse(date)) def schema: StructType = { StructType(StructField("device_id", StringType) :: StructField("platform", StringType) :: StructField("pkg_name", IntegerType) :: Nil) } var adnDF = spark.emptyDataFrame if (region.equals("virginia")) { adnDF = spark.createDataFrame(spark.read.orc(input_adn).rdd .map(r => { Row(r.getAs("device_id").toString.toUpperCase(), r.getAs("platform"), packageMap.value.getOrElse(r.getAs("pkg_name").toString.toLowerCase, 0)) }), schema) .groupBy("device_id").agg( max("platform"), collect_set("pkg_name") ).toDF("device_id", "platform", "pkg_name") } val dspDF = spark.read.orc(input_dsp) .rdd .map(r => { val set = new mutable.HashSet[Int]() val package_list = r.getAs("package_list").asInstanceOf[mutable.WrappedArray[String]] package_list.foreach(packageName => { if (packageMap.value.keySet.contains(packageName.toLowerCase)) { set.add(packageMap.value(packageName.toLowerCase)) } }) // calc age label val age = if (r.getAs("birthday") != null && StringUtils.isNotBlank(r.getAs("birthday").toString) && Logic.check_birthday(now, r.getAs("birthday").toString)) { Logic.calcLabel(now, r.getAs("birthday").toString) } else { 0 } val gender = if (r.getAs("gender") != null && StringUtils.isNotBlank(r.getAs("gender").toString)) { r.getAs("gender").toString match { case "m" => 1 case "f" => 2 case _ => 0 } } else { 0 } val version = new Date().getTime var interest = Array.empty[String] set.foreach(pkg => { if (interest.isEmpty) { interest = bMap.value.getOrElse(pkg, Array.empty[String]) } else { interest ++= bMap.value.getOrElse(pkg, Array.empty[String]) } }) RealtimeServiceHour(r.getAs("device_id").toString.toUpperCase, r.getAs("platform"), age, gender, if (r.getAs("country_code") != null && r.getAs("country_code").toString.length == 2) { r.getAs("country_code") } else { "" }, mutable.WrappedArray.make(interest.distinct), mutable.WrappedArray.make(set.toArray)) }) /** * user_info save */ // dspDF.createClickHouseDb(database, clusterName) // dspDF.createClickHouseTable(database, table, Seq("dt", "hour", "region"), Constant.indexColumn, Constant.orderColumn, clusterName) val emptyDF = spark.emptyDataFrame emptyDF.dropPartition(database, table, s"($partDate,'$hour','$region')", clusterName) var df = dspDF.toDF if (!adnDF.rdd.isEmpty()) { val adnRDD = adnDF.rdd.map(r => { (r.getAs("device_id").toString, (r.getAs("platform").toString, r.getAs("pkg_name").asInstanceOf[mutable.WrappedArray[Int]])) }) val dspRDD = dspDF.map(r => { (r.device_id, (r.platform, r.age, r.gender, r.country, r.install_apps, r.interest)) }) df = adnRDD.fullOuterJoin(dspRDD).map(r => { val device_id = r._1 val adnOpt = r._2._1 val dspOpt = r._2._2 var platform = "" var age = 0 var gender = 0 var country = "" var install_apps = Array.empty[Int] var interest = Array.empty[String] if (adnOpt.isDefined && dspOpt.isDefined) { platform = dspOpt.get._1 age = dspOpt.get._2 gender = dspOpt.get._3 country = dspOpt.get._4 val adn_install_set = adnOpt.get._2.toSet val dsp_install_set = dspOpt.get._5.toSet install_apps = (dsp_install_set ++ adn_install_set).toArray } else if (adnOpt.isDefined && dspOpt.isEmpty) { platform = adnOpt.get._1 install_apps = adnOpt.get._2.toArray } else { platform = dspOpt.get._1 age = dspOpt.get._2 gender = dspOpt.get._3 country = dspOpt.get._4 install_apps = dspOpt.get._5.toArray } install_apps.foreach(pkg => { if (interest.isEmpty) { interest = bMap.value.getOrElse(pkg, Array.empty[String]) } else { interest ++= bMap.value.getOrElse(pkg, Array.empty[String]) } }) val countryPtn = """^([a-zA-Z]){2}""" // println(!data.matches(countryPtn)) val countryCode = if (!country.isEmpty && StringUtils.isNotBlank(country) && country.matches(countryPtn)) { country.toUpperCase() } else { "" } RealtimeServiceHour(device_id, platform, age, gender, countryCode, mutable.WrappedArray.make(interest.distinct), install_apps) }).toDF } df.coalesce(200).saveToClickHouse(database, table, Seq(dt, hour, region), Seq("dt", "hour", "region"), clusterName, batchSize = 1000000) } finally { if (spark != null) { spark.stop() } } } } object MergeEtlHourToCK { def main(args: Array[String]): Unit = { new MergeEtlHourToCK().run(args) } }