package mobvista.dmp.datasource.rtdmp.lazada import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.util.DateUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.SaveMode /** * @package: mobvista.dmp.datasource.rtdmp.lazada * @author: wangjf * @date: 2021/8/5 * @time: 7:23 下午 * @email: jinfeng.wang@mobvista.com */ class MergeEventJob extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("dt", true, "dt") options.addOption("tb_type", true, "tb_type") options.addOption("output", true, "output") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val dt = commandLine.getOptionValue("dt") val output = commandLine.getOptionValue("output") val spark = MobvistaConstant.createSparkSession(s"MergeEventJob.${dt}") val sc = spark.sparkContext try { val new_date = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(dt)) val update_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyy-MM-dd", -7) val before_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyyMMdd", -1) val sql: String = Constant.merge_event_sql .replace("@dt", dt) .replace("@new_date", new_date) .replace("@update_date", update_date) .replace("@before_date", before_date) spark.sql(sql) .coalesce(100) .write .option("orc.compress", "zlib") .mode(SaveMode.Overwrite) .orc(output) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object MergeEventJob { def main(args: Array[String]): Unit = { new MergeEventJob().run(args) } }