package mobvista.dmp.datasource.rtdmp import java.net.URI import com.alibaba.fastjson.JSONObject import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SaveMode import scala.collection.mutable /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/8/17 * @time: 10:57 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class RTDmpMergeFix extends CommonSparkJob with Serializable { var expire_time = "" def commandOptions(): Options = { val options = new Options() options.addOption("input", true, "input") options.addOption("output", true, "output") options.addOption("partition", true, "partition") options.addOption("date_time", true, "date_time") options.addOption("old_time", true, "old_time") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) // val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val partition = commandLine.getOptionValue("partition") val date_time = commandLine.getOptionValue("date_time") val old_time = commandLine.getOptionValue("old_time") val spark = MobvistaConstant.createSparkSession(s"RTDmpMergeFix.$date_time") val sc = spark.sparkContext try { spark.udf.register("process", process _) val df = spark.sql(sql.replace("@dt", old_time)) .dropDuplicates() FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) df.coalesce(partition.toInt) .write.mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } val sql = """ |SELECT devid, process(audience_id, update_time) audience_map, | update_time | FROM dwh.audience_merge | WHERE dt = '@dt' |""".stripMargin def process(audienceIds: mutable.WrappedArray[Int], updateTime: String): String = { val json = new JSONObject() audienceIds.foreach(audienceId => { json.put(String.valueOf(audienceId), updateTime) }) json.toJSONString } } object RTDmpMergeFix { def main(args: Array[String]): Unit = { new RTDmpMergeFix().run(args) } }