package mobvista.dmp.datasource.datatory

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}

/**
  * @package: mobvista.dmp.datasource.datatory
  * @author: wangjf
  * @date: 2019/4/3
  * @time: 下午2:03
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class DatatoryJob extends CommonSparkJob with java.io.Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("coalesce", true, "coalesce")
    options.addOption("json", true, "json")
    options.addOption("tag", true, "tag")

    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val coalesce = commandLine.getOptionValue("coalesce")
    val json = commandLine.getOptionValue("json")
    val tag = commandLine.getOptionValue("tag")

    val spark = SparkSession
      .builder()
      .appName("DatatoryJob")
      .config("spark.rdd.compress", "true")
      .config("spark.shuffle.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.io.compression.lz4.blockSize", "64k")
      .config("spark.sql.autoBroadcastJoinThreshold", "209715200")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    try {

      val code_sql = Constant.ods2new_sql
      val bMap = sc.broadcast(spark.sql(code_sql).rdd.map(r => {
        (r.getAs("tag_id").toString, (r.getAs("new_first_id").toString, r.getAs("new_second_id").toString))
      }).collectAsMap())

      val sql =
        """
          |SHOW PARTITIONS dwh.ods_dmp_user_info_all
        """.stripMargin
      val partDF = spark.sql(sql)
      val date = partDF.orderBy(partDF("partition").desc).take(1)(0).getString(0).split("=")(1)


      /*
      spark.udf.register("filterByPackage", Constant.filterByPackage _)
      spark.udf.register("filterByCountry", Constant.filterByCountry _)

      val packageFilterEntity: PackageFilterEntity = Constant.parseJsonString(json)

      val jobId = packageFilterEntity.jobId

      val sdf1 = new SimpleDateFormat("yyyy-MM-dd")
      val sdf2 = new SimpleDateFormat("yyyyMMdd")
      val updateDate = sdf1.format(sdf2.parse(packageFilterEntity.start))

      var sql = Constant.filter_sql.replace("@date", packageFilterEntity.end).replace("@update_date", updateDate)

      if (StringUtils.isNotBlank(packageFilterEntity.countryCode)) {
        sql = sql + s" AND filterByCountry(country,'${packageFilterEntity.countryCode}')"
      }

      if (StringUtils.isNotBlank(packageFilterEntity.packageList)) {
        sql = sql + s" AND filterByPackage(install,'${packageFilterEntity.packageList}')"
      }

      val baseDF = spark.sql(sql).rdd
        .map(r => {
          PackageUserInfoEntity(r.getAs("dev_id"), r.getAs("install"), r.getAs("interest"), r.getAs("country"), r.getAs("age"), r.getAs("gender"))
        }).persist(StorageLevel.MEMORY_AND_DISK_SER)
        */

      val base = Constant.processQuery(date, tag, json.replaceAll("&@", " "), spark)


      /*
      val top: Integer = if (packageFilterEntity.top == null) {
        20
      } else {
        packageFilterEntity.top
      }

      val all = baseDF.count()
      val allDF = sc.parallelize(Seq(Result(jobId, "all", "all", all.toInt)))

      val packageDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "package"))
        .reduceByKey(_ + _)
        .repartition(coalesce.toInt)
        .sortBy(_._2, false)
        .map(r => {
          Result(jobId, "package", r._1, r._2)
        })

      val packageRDD = sc.parallelize(packageDF.take(top))

      val interestDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "interest"))
        .reduceByKey(_ + _)
        .repartition(coalesce.toInt)
        .sortByKey()
        .map(r => {
          Result(jobId, "interest", r._1, r._2)
        })

      val countryDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "country"))
        .reduceByKey(_ + _)
        .repartition(coalesce.toInt)
        .sortBy(_._2, false)
        .map(r => {
          Result(jobId, "country", r._1, r._2)
        })

      val countryRDD = sc.parallelize(countryDF.take(top))

      val ageDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "age"))
        .reduceByKey(_ + _)
        .repartition(coalesce.toInt)
        .sortByKey()
        .map(r => {
          Result(jobId, "age", r._1, r._2)
        })

      val genderDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "gender"))
        .reduceByKey(_ + _)
        .repartition(coalesce.toInt)
        .sortByKey()
        .map(r => {
          Result(jobId, "gender", r._1, r._2)
        })

      import spark.implicits._
      val df = allDF.union(packageRDD).union(interestDF).union(countryRDD).union(ageDF).union(genderDF)
        .toDF
        .repartition(1)
        */
      val df: Dataset[Row] = Constant.processBase(base._1, base._2, base._3, base._4, spark, bMap)

      /*
      val prop = new java.util.Properties
      prop.setProperty("driver", "com.mysql.jdbc.Driver")
      prop.setProperty("user", "apptag_rw")
      prop.setProperty("password", "7gyLEVtkER3u8c9")
      prop.setProperty("characterEncoding", "utf8")

      df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/datatory", "result", prop)
      */
      Constant.writeMySQL(df, "result", SaveMode.Append)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object DatatoryJob {
  def main(args: Array[String]): Unit = {
    new DatatoryJob().run(args)
  }
}