package mobvista.dmp.datasource.setting

import java.net.URI
import java.util.Properties

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}


class SettingTotal extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("outputtotal", true, "[must] outputtotal")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("today", true, "[must] today")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val outputtotal = commandLine.getOptionValue("outputtotal")
    val coalesce = commandLine.getOptionValue("coalesce")
    val today = commandLine.getOptionValue("today")




    val spark = SparkSession.builder()
      .appName("AppsFlyerTotal")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true)

    try {

      val sqlContext = spark.sqlContext
      val properties = new Properties()
      properties.put("user", "adnro")
      properties.put("password", "YcM123glh")
      val url = "jdbc:mysql://adn-mysql-external.mobvista.com:3306/mob_adn"
      sqlContext.read.jdbc(url, "publisher_channel", properties).select("id", "package_name", "platform").toDF("id", "package_name", "platform").createOrReplaceTempView("publisher_channel")

      val sqlDaily=
        s"""
           |  select t1.app_id,coalesce(t2.package_name,t1.package_name) package_name,t1.platform
           |  from (select app_id,package_name,platform
           |   from dwh.ods_adn_ngix_setting_global where concat(yyyy,mm,dd) = '${today}') t1
           |   left join (select id appid,replace(package_name,"id", "") package_name,platform
           |  from publisher_channel where platform = '2') t2
           |   on(t1.app_id = t2.appid)
      """.stripMargin

      spark.sql(sqlDaily)
        .coalesce(coalesce.toInt).rdd
          .map(_.mkString(","))
          .saveAsTextFile(outputtotal)


    } finally {
      spark.stop()
    }
    0
  }
}

object SettingTotal {
  def main(args: Array[String]): Unit = {
    new SettingTotal().run(args)
  }
}