package mobvista.dmp.datasource.age_gender

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel


class ThirdPartySourceTotal extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("outputtotal", true, "[must] outputtotal")
    options.addOption("outputgender",true, "[must] outputgender")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("yesterday", true, "[must] yesterday")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val outputtotal = commandLine.getOptionValue("outputtotal")
      val outputgender = commandLine.getOptionValue("outputgender")
    val coalesce = commandLine.getOptionValue("coalesce")
    val yesterday = commandLine.getOptionValue("yesterday")


    val spark = SparkSession.builder()
      .appName("ThirdPartySourceTotal")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputgender), true)

    try {

     // spark.udf.register("check_deviceId", Logic.check_deviceId _)
      // spark.udf.register("check_gender", Logic.check_gender _)



      val sql1 =
        s"""
        SELECT device_id,device_type,min(platform) platform,min(case when gender = 'male' then 'm'  when gender = 'female' then 'f' else gender end ) as gender
           |from dwh.etl_gender_thirdparty_data_total where dt ='${yesterday}'
           |group by device_id,device_type
        """.stripMargin



      val etl_gender_tp_data = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK)
      etl_gender_tp_data
        .coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputtotal)

     val gender_rdd=  etl_gender_tp_data
        .rdd
        .map(line => {
          val device_id = line.getAs[String]("device_id")
          val device_type = line.getAs[String]("device_type")
          val gender = line.getAs[String]("gender")
          Row(device_id, "A", gender, "tp", device_type)
        })

      spark.createDataFrame(gender_rdd, Constant.schema_age_gender)
        .write.mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputgender)


     /* spark.sql(sql1).coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputtotal)



      val sql_gender =   Constant.tp_sql
          .replace("@date", yesterday)
          .replace("@check_deviceId", "check_deviceId(device_id)")
         //  .replace("@check_gender", "check_gender(gender)")

    val gender_rdd =  spark.sql(sql_gender).rdd.filter(line =>
      {  var gender  = line.getAs[String]("gender")
        StringUtils.isNotBlank(gender) && (gender.equalsIgnoreCase("m") || gender.equalsIgnoreCase("f"))
      }
    ).map(line => {
        val device_id = line.getAs[String]("device_id")
        val device_type = line.getAs[String]("device_type")
        val gender = line.getAs[String]("gender")
      Row(device_id, "A", gender, "tp", device_type)
      })


      spark.createDataFrame(gender_rdd, Constant.schema_age_gender)
        .write.mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputgender)*/


    } finally {
      spark.stop()
    }
    0
  }
}

object ThirdPartySourceTotal {
  def main(args: Array[String]): Unit = {
    new ThirdPartySourceTotal().run(args)
  }
}