package mobvista.dmp.datasource.rtdmp.lazada

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.SaveMode

/**
 * @package: mobvista.dmp.datasource.rtdmp.lazada
 * @author: wangjf
 * @date: 2021/8/5
 * @time: 7:23 下午
 * @email: jinfeng.wang@mobvista.com
 */
class ETLJob extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("dt", true, "dt")
    options.addOption("tb_type", true, "tb_type")
    options.addOption("output", true, "output")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val dt = commandLine.getOptionValue("dt")
    val tb_type = commandLine.getOptionValue("tb_type")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession(s"ETLJob.${tb_type}.${dt}")

    val sc = spark.sparkContext
    try {
      val sql: String = if (tb_type.equals("install")) {
        Constant.etl_install_sql
      } else {
        Constant.etl_event_sql
      }

      spark.sql(sql.replace("@dt", dt))
        .coalesce(20)
        .write
        .option("orc.compress", "zlib")
        .mode(SaveMode.Overwrite)
        .orc(output)

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object ETLJob {
  def main(args: Array[String]): Unit = {
    new ETLJob().run(args)
  }
}