FixDmpInstallList.scala 2.24 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
package mobvista.dmp.datasource.dm

import java.net.URI

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode

/**
 * @package: mobvista.dmp.datasource.dm
 * @author: wangjf
 * @date: 2020-12-14 17:50:36
 * @time: 下午6:42
 * @email: jinfeng.wang@mobvista.com
 */
class FixDmpInstallList extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      printOptions(commandLine)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val business = commandLine.getOptionValue("business")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"FixDmpInstallList.$date")
    val sc = spark.sparkContext

    try {

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      val sql =
        s"""
           |SELECT device_id, device_type, platform, MAX(country) country, MAX(install_list) install_list,
           |  MAX(ext_data) ext_data, MAX(update_date) update_date
           |  FROM dwh.dmp_install_list WHERE dt = '$date' AND business = '$business'
           |  GROUP BY device_id, device_type, platform
           |""".stripMargin

      spark.sql(sql)
        .repartition(coalesce)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(output)

    } finally {
      if (spark != null) {
        sc.stop()
        spark.stop()
      }
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("business", true, "[must] business")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}

object FixDmpInstallList {
  def main(args: Array[String]): Unit = {
    new FixDmpInstallList().run(args)
  }
}