package mobvista.dmp.datasource.rtdmp import java.net.URI import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SaveMode /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/7/13 * @time: 11:25 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class DeviceInfoCalc extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val spark = MobvistaConstant.createSparkSession(s"DeviceInfoCalc.$date") val sc = spark.sparkContext try { val sql = Constant.device_info_sql.replace("@dt", date) spark.udf.register("processToRegion", Logic.processToRegion _) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) spark.sql(sql) .coalesce(4000) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object DeviceInfoCalc { def main(args: Array[String]): Unit = { new DeviceInfoCalc().run(args) } }