package mobvista.dmp.datasource.rtdmp.lazada import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.SaveMode /** * @package: mobvista.dmp.datasource.rtdmp.lazada * @author: wangjf * @date: 2021/8/5 * @time: 7:23 下午 * @email: jinfeng.wang@mobvista.com */ class ETLJob extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("dt", true, "dt") options.addOption("tb_type", true, "tb_type") options.addOption("output", true, "output") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val dt = commandLine.getOptionValue("dt") val tb_type = commandLine.getOptionValue("tb_type") val output = commandLine.getOptionValue("output") val spark = MobvistaConstant.createSparkSession(s"ETLJob.${tb_type}.${dt}") val sc = spark.sparkContext try { val sql: String = if (tb_type.equals("install")) { Constant.etl_install_sql } else { Constant.etl_event_sql } spark.sql(sql.replace("@dt", dt)) .coalesce(20) .write .option("orc.compress", "zlib") .mode(SaveMode.Overwrite) .orc(output) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object ETLJob { def main(args: Array[String]): Unit = { new ETLJob().run(args) } }