package mobvista.dmp.datasource.age_gender

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

/**
 * @package: mobvista.dmp.datasource.age
 * @author: wangjf
 * @create: 2018-09-10 16:46
 * */
class MergeInstallGender extends CommonSparkJob with Serializable {

  override protected def run(args: Array[String]): Int = {

    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val dsp_gender_path = commandLine.getOptionValue("dsp_gender_path")
    val ga_gender_path = commandLine.getOptionValue("ga_gender_path")
    val fb_gender_path = commandLine.getOptionValue("fb_gender_path")
    val tp_gender_path = commandLine.getOptionValue("tp_gender_path")
    val gender_output = commandLine.getOptionValue("gender_output")
    val date = commandLine.getOptionValue("date")
    //  val ga_date = commandLine.getOptionValue("ga_date")
    val parallelism = commandLine.getOptionValue("parallelism")

    val spark = SparkSession.builder()
      .appName("MergeInstallGender")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(gender_output), true)
    try {
      val dsp_gender = spark.read.schema(Constant.schema_age_gender).orc(dsp_gender_path)
      val ga_gender = spark.read.schema(Constant.schema_age_gender).orc(ga_gender_path)
      val fb_gender = spark.read.schema(Constant.schema_age_gender).orc(fb_gender_path)
      val tp_gender = spark.read.schema(Constant.schema_age_gender).orc(tp_gender_path)

      dsp_gender.union(ga_gender).union(fb_gender).union(tp_gender).createOrReplaceTempView("t_gender")

      spark.udf.register("pkg_keys", Logic.pkg_keys _)
      //  spark.udf.register("split_keys", Logic.split_keys _)
      //  val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -91), "yyyy-MM-dd")

      val sql = Constant.dmp_install_list_sql_14days.replace("@date", date)
      //  .replace("@ga_date", ga_date)
      //  .replace("@update_date", update_date)

      spark.sql(sql).createOrReplaceTempView("t_install")

      spark.sql(Constant.dmp_install_list_join_gender_sql).repartition(parallelism.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(gender_output)
      /*
      val install = spark.sql(sql)
        .rdd
        .mapPartitions(buildInstall)
        .combineByKey(
          (v: String) => Iterable(v),
          (c: Iterable[String], v: String) => c ++ Seq(v),
          (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2
        ).mapPartitions(Logic.mergeInstallPart)

      val age_df = install.union(dsp_gender).union(ga_gender).union(fb_gender).union(tp_gender)
        .combineByKey(
          (v: String) => Iterable(v),
          (c: Iterable[String], v: String) => c ++ Seq(v),
          (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2
        ) mapPartitions (Logic.mergeGenderPart)

      spark.createDataFrame(age_df.coalesce(numPartitions = parallelism.toInt, shuffle = true), Constant.merge_schema)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(gender_output)
      */

    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }

  def buildAgeGender(rows: Iterator[Row]): Iterator[(String, String)] = {
    rows.map(row => {
      (MRUtils.JOINER.join(row.getAs("device_id"), row.getAs("device_type")), MRUtils.JOINER.join(row.getAs("tag"), row.getAs("label"),
        row.getAs("business")))
    })
  }

  def buildInstall(rows: Iterator[Row]): Iterator[(String, String)] = {
    rows.map(row => {
      (MRUtils.JOINER.join(row.getAs("device_id"), row.getAs("device_type")), MRUtils.JOINER.join(row.getAs("package_names"),
        row.getAs("new_date")))
    })
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("dsp_gender_path", true, "[must] dsp_gender_path")
    options.addOption("ga_gender_path", true, "[must] ga_gender_path")
    options.addOption("fb_gender_path", true, "[must] fb_gender_path")
    options.addOption("tp_gender_path", true, "[must] tp_gender_path")
    options.addOption("gender_output", true, "[must] gender_output")
    options.addOption("date", true, "[must] date")
    //  options.addOption("ga_date", true, "[must] ga_date")
    options.addOption("parallelism", true, "[must] parallelism")
    options
  }
}

object MergeInstallGender {
  def main(args: Array[String]): Unit = {
    new MergeInstallGender().run(args)
  }
}