package mobvista.dmp.datasource.adn import java.net.URI import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SaveMode class AdnLogEtlJob extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("name", true, "name") options.addOption("datetime", true, "datetime") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val name = commandLine.getOptionValue("name") val datetime = commandLine.getOptionValue("datetime") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val spark = MobvistaConstant.createSparkSession(name) try { val sql = AdnConstant.adn_request_etl_sql.replace("@datetime", datetime) spark.udf.register("getDevId", AdnConstant.getDevId _) spark.udf.register("check_devId", AdnConstant.check_devId _) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) spark.sql(sql).repartition(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } } object AdnLogEtlJob { def main(args: Array[String]): Unit = { new AdnLogEtlJob().run(args) } }