package mobvista.dmp.datasource.rtdmp.lazada

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.{Logic, ServerUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.io.compress.GzipCodec

import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.rtdmp.lazada
 * @author: wangjf
 * @date: 2021/8/5
 * @time: 7:23 下午
 * @email: jinfeng.wang@mobvista.com
 */
class ProcessRTJob extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("dt", true, "dt")
    options.addOption("tb_type", true, "tb_type")
    options.addOption("output", true, "output")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val dt = commandLine.getOptionValue("dt")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession(s"ProcessRTJob.${dt}")

    val sc = spark.sparkContext
    try {

      val package_name = "com.lazada.android"

      val sql: String = Constant.process_rtdmp_audience_sql
        .replace("@dt", dt)

      spark.sql(sql)
        .rdd.map(row => {
        row.getAs[String]("gaid")
      }).coalesce(20)
        .saveAsTextFile(s"$output/${package_name}/gaid", classOf[GzipCodec])

      val map = Logic.getAudienceInfo("lazada")
      Logic.writeAudienceInfo("lazada", Logic.getAudienceMap(mutable.HashSet(package_name)))

      if (map.contains(package_name)) {
        val updateArray = new JSONArray()
        val jsonObject = new JSONObject()
        jsonObject.put("id", map(package_name))
        jsonObject.put("s3_path", s"$output/${package_name}/gaid/")
        jsonObject.put("status", 1)
        jsonObject.put("audience_data_status", 1)
        updateArray.add(jsonObject)
        ServerUtil.update(updateArray)
      } else {
        val uploadArray = new JSONArray()
        val jsonObject = new JSONObject()
        jsonObject.put("s3_path", s"$output/${package_name}/gaid/")
        jsonObject.put("status", 1)
        jsonObject.put("audience_data_status", 1)
        jsonObject.put("platform", 1)
        jsonObject.put("match_device_type", 3)
        jsonObject.put("audience_type", 2)
        jsonObject.put("data_update_method", 1)
        jsonObject.put("audience_name", package_name)
        jsonObject.put("audience_gender", 3)
        jsonObject.put("audience_count", 1)
        jsonObject.put("is_sync_dmpserver", 1)
        uploadArray.add(jsonObject)
        ServerUtil.upload(uploadArray)
      }
    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object ProcessRTJob {
  def main(args: Array[String]): Unit = {
    new ProcessRTJob().run(args)
  }
}