package mobvista.dmp.datasource.dm

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.{StringType, StructField, StructType}

import java.net.URI
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.dm
 * @author: wangjf
 * @date: 2020-12-14 17:50:36
 * @time: 下午6:42
 * @email: jinfeng.wang@mobvista.com
 */
class FixInstallListIdfv extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      printOptions(commandLine)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val business = commandLine.getOptionValue("business")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"FixInstallListIdfv.$date.${business}")
    val sc = spark.sparkContext
    val schema =
      StructType(Array(
        StructField("device_id", StringType),
        StructField("ruid", StringType)
      ))

    try {
      spark.udf.register("getInstallList", getInstallList _)

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
      /*
      val mappingRdd = sc.textFile(input).map(_.split("\t")).filter(_.length == 2)
        .map(r => {
          Row(r(0), r(1))
        })

      val df = spark.createDataFrame(mappingRdd, schema).dropDuplicates
      */
      val df = spark.read.orc(input)
        .dropDuplicates()

      df.createOrReplaceTempView("mapping")

      /*
      //  old dmp
      var sql =
        s"""
           |SELECT mp.new_id device_id, 'idfv' device_type, 'ios' platform, getInstallList(COLLECT_SET(CONCAT(device_type, '\001', install_list))) install_list
           |  FROM dwh.dm_install_list dmp INNER JOIN mapping mp
           |  ON UPPER(dmp.device_id) = UPPER(mp.old_id)
           |  WHERE CONCAT(year, month, day) = '$date' AND business = '$business' AND platform = 'ios'
           |  GROUP BY mp.new_id
           |""".stripMargin
      */

      //  new dmp
      var sql =
        s"""
           |SELECT mp.new_id device_id, 'idfv' device_type, 'ios' platform, country, install_list, ext_data, update_date
           |  FROM dwh.dmp_install_list dmp INNER JOIN mapping mp
           |  ON UPPER(dmp.device_id) = UPPER(mp.old_id)
           |  WHERE dt = '$date' AND business = '$business' AND platform = 'ios'
           |""".stripMargin

      val ruid = spark.sql(sql)

      sql =
        s"""
           |SELECT device_id, device_type, platform, country, install_list, ext_data, update_date
           |  FROM dwh.dmp_install_list
           |  WHERE dt = '$date' AND business = '$business'
           |""".stripMargin

      val other = spark.sql(sql)

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      ruid.union(other)
        .repartition(coalesce)
        .write
        .option("orc.compress", "zlib")
        .mode(SaveMode.Overwrite)
        .orc(output)


      /*
      val rdd = ruid.fullOuterJoin(other).map(tuple => {
        val deviceId = tuple._1
        val valTuple = tuple._2
        val ruidOpt = valTuple._1
        val otherOpt = valTuple._2
        if (ruidOpt.isEmpty && otherOpt.isDefined) {
          MRUtils.JOINER.join(deviceId, otherOpt.get)
        } else {
          MRUtils.JOINER.join(deviceId, ruidOpt.get)
        }
      })

      rdd.repartition(coalesce)
        .saveAsTextFile(output, classOf[GzipCodec])
      */

    } finally {
      if (spark != null) {
        sc.stop()
        spark.stop()
      }
    }
    0
  }

  def getInstallList(installs: mutable.WrappedArray[String]): String = {
    var install_list: String = ""
    var flag = true
    val iter = installs.iterator
    while (iter.hasNext && flag) {
      val arr = iter.next().split("\001", -1)
      val deviceType = arr(0)
      val installList = arr(1)
      install_list = installList
      if (deviceType.equalsIgnoreCase("idfa")) {
        flag = false
      }
    }
    install_list
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("business", true, "[must] business")
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}

object FixInstallListIdfv {
  def main(args: Array[String]): Unit = {
    new FixInstallListIdfv().run(args)
  }
}