package mobvista.dmp.datasource.rtdmp import java.net.URI import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.datasource.rtdmp.Constant.DmpDeviceRegin import mobvista.dmp.datasource.rtdmp.Logic.check_region import mobvista.dmp.util.DateUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SaveMode import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/7/13 * @time: 11:25 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class DeviceRegionMerge extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("input", true, "input") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val spark = MobvistaConstant.createSparkSession("DeviceRegionMerge") val dt = DateUtil.getDayByString(date, "yyyyMMdd", -1) val before_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -7), "yyyy-MM-dd") val sc = spark.sparkContext try { val dailyDF = spark.read.schema(Logic.schema).orc(input) dailyDF.createOrReplaceTempView("device_region_daily") val sql = Constant.merge_sql.replace("@dt", dt) .replace("@before_date", before_date) .replace("@update_date", update_date) spark.udf.register("parseRegion", Logic.parseRegion _) spark.udf.register("check_region", Logic.check_region _) spark.udf.register("check_tag", Logic.check_tag _) spark.udf.register("check_publish", Logic.check_publish _) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) spark.sql(sql) .repartition(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object DeviceRegionMerge { def main(args: Array[String]): Unit = { new DeviceRegionMerge().run(args) } }