package mobvista.dmp.datasource.rtdmp.lazada

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.SaveMode

/**
 * @package: mobvista.dmp.datasource.rtdmp.lazada
 * @author: wangjf
 * @date: 2021/8/5
 * @time: 7:23 下午
 * @email: jinfeng.wang@mobvista.com
 */
class MergeEventJob extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("dt", true, "dt")
    options.addOption("tb_type", true, "tb_type")
    options.addOption("output", true, "output")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val dt = commandLine.getOptionValue("dt")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession(s"MergeEventJob.${dt}")

    val sc = spark.sparkContext
    try {
      val new_date = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(dt))

      val update_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyy-MM-dd", -7)

      val before_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyyMMdd", -1)

      val sql: String = Constant.merge_event_sql
        .replace("@dt", dt)
        .replace("@new_date", new_date)
        .replace("@update_date", update_date)
        .replace("@before_date", before_date)

      spark.sql(sql)
        .coalesce(100)
        .write
        .option("orc.compress", "zlib")
        .mode(SaveMode.Overwrite)
        .orc(output)

    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object MergeEventJob {
  def main(args: Array[String]): Unit = {
    new MergeEventJob().run(args)
  }
}