package mobvista.dmp.datasource.rtdmp import com.alibaba.fastjson.JSON import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.util.{DateUtil, MySQLUtil} import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._ import org.apache.commons.cli.{BasicParser, Options} import ru.yandex.clickhouse.ClickHouseDataSource import java.text.SimpleDateFormat import java.util.Calendar import scala.collection.JavaConversions._ import scala.collection.mutable /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/8/17 * @time: 10:57 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class RTDmpMergeCK extends CommonSparkJob with Serializable { var expire_time = "" def commandOptions(): Options = { val options = new Options() options.addOption("date_time", true, "date_time") options.addOption("host", true, "host") options.addOption("cluster", true, "cluster") options.addOption("database", true, "database") options.addOption("table", true, "table") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date_time = commandLine.getOptionValue("date_time") val cluster = commandLine.getOptionValue("cluster") val host = commandLine.getOptionValue("host") val database = commandLine.getOptionValue("database") val table = commandLine.getOptionValue("table") val spark = MobvistaConstant.createSparkSession(s"RTDmpMergeCK.$date_time") try { expire_time = DateUtil.getDayByString(date_time, "yyyyMMddHH", -1) val tdf = spark.emptyDataFrame val sdf = new SimpleDateFormat("yyyyMMddHH") // drop expire partition val calendar = Calendar.getInstance() var date = sdf.parse(date_time) calendar.setTime(date) calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) - 6) val expire_part = sdf.format(calendar.getTime) var dt_part = expire_part.substring(0, 8) var hour_part = expire_part.substring(8, 10) implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host) val clusterName = Some(cluster): Option[String] tdf.dropPartition(database, table, s"($dt_part,'$hour_part')", clusterName) spark.udf.register("process", process _) val df = spark.sql(sql.replace("@dt", date_time)) .filter("size(audience_id) > 0") dt_part = date_time.substring(0, 8) hour_part = expire_time.substring(8, 10) val dt = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(dt_part)) tdf.dropPartition(database, table, s"($dt_part,'$hour_part')", clusterName) Thread.sleep(120000) df.saveToClickHouse(database, table, Seq(dt, hour_part), Seq("dt", "hour"), clusterName, batchSize = 200000) MySQLUtil.update(database, table, date_time) } finally { if (spark != null) { spark.stop() } } 0 } val sql = """ |SELECT devid, process(audience_map) audience_id, | CASE WHEN device_type IS NOT NULL AND device_type != '' THEN device_type ELSE 'unknown' END AS device_type | FROM dwh.audience_merge | WHERE dt = '@dt' |""".stripMargin def process(audience_map: String): mutable.WrappedArray[Int] = { val set = new mutable.HashSet[Int]() val audienceMap = JSON.parseObject(audience_map).asInstanceOf[java.util.Map[String, String]] audienceMap.retain((_, v) => v.compareTo(expire_time) > 0).keys.foreach(k => { set.add(Integer.parseInt(k)) }) mutable.WrappedArray.make(set.toArray) } } object RTDmpMergeCK { def main(args: Array[String]): Unit = { new RTDmpMergeCK().run(args) } }