package mobvista.dmp.test

import mobvista.dmp.common.CommonSparkJob
import org.apache.spark.sql.SparkSession

class RunSql extends CommonSparkJob {
  override protected def run(args: Array[String]): Int = {

    val command = commParser.parse(options, args)
    if (!checkMustOption(command)) {
      printUsage(options)
      return 0
    } else {
      printOptions(command)
    }

    val input = command.getOptionValue("input")
    val output = command.getOptionValue("output")
    val parallelism = command.getOptionValue("parallelism", "500").toInt
    val coalesce = command.getOptionValue("coalesce", "10").toInt

    val spark = SparkSession
      .builder()
      .appName("RunSql")
      .config("spark.rdd.compress", "true")
      .config("spark.default.parallelism", parallelism)
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    import spark.implicits._

    try {
      spark.sql(input)
        .write
        .option("orc.compress", "zlib")
        .orc(output)
    } finally {
      spark.stop()
    }
    0
  }
}


object RunSql {
  def main(args: Array[String]): Unit = {
    new RunSql().run(args)
  }
}