package mobvista.dmp.datasource.joypac

import java.net.URI
import java.util.Properties

import mobvista.dmp.datasource.retargeting.Constant
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import scala.collection.mutable

class JoypacUserFeatureJob extends Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("dict_output", true, "dict_output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  var packageIds: Set[Int] = null

  val user_feature_sql: String =
    s"""SELECT LOWER(device_id) device_id, age, gender, interest, concat_ws(',',install) install_apps, frequency
       |  FROM dwh.dm_user_info
       |  WHERE dt = '@date' AND update_date = '@update_date' AND ((country != '' AND country IS NOT NULL) OR (age != '' AND age IS NOT NULL) OR
       |  (gender != '' AND gender IS NOT NULL) OR (size(interest) != 0 AND interest IS NOT NULL) OR (size(install) != 0
       |  AND install IS NOT NULL)) AND has(install)
    """.stripMargin

  private def run(args: Array[String]) {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val dict_output = commandLine.getOptionValue("dict_output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val spark = SparkSession
      .builder()
      .appName(s"JoypacUserFeatureJob.$date")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .enableHiveSupport()
      .getOrCreate()

    try {
      val sc = spark.sparkContext
      /*
      val bMap = spark.sparkContext.broadcast(spark.sql(Constant.id_old2new_sql).rdd.map(r => {
        (r.getAs("tag_code").toString, r.getAs("tag_id").toString)
      }).collectAsMap())
      */

      val packageNames = jdbcConnection(spark, "dmp", "package_list").rdd.map(r => {
        var packageName = r.getAs("package_name").toString.toLowerCase()
        if (packageName.matches("^id[0-9]+$")) {
          packageName = packageName.replace("id", "")
        }
        packageName
      }).cache().collect().toSet
      val rdd = Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd
        .filter(r => {
          packageNames.contains(r.getAs("app_package_name").toString.toLowerCase())
        }).cache()
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(dict_output), true)
      rdd.map(r => {
        (r.getAs("app_package_name").toString, Integer.parseInt(r.getAs("id").toString))
      }).coalesce(1).saveAsTextFile(dict_output)

      packageIds = rdd.map(r => {
        Integer.parseInt(r.getAs("id").toString)
      }).collect().toSet

      println("packageIds.size ==>> " + packageIds.size)
      spark.udf.register("has", has _)
      val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")
      val sql = user_feature_sql.replace("@date", date)
        .replace("@update_date", update_date)
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
      spark.sql(sql)
        .repartition(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }

    def jdbcConnection(spark: SparkSession, database: String, table: String): DataFrame = {
      val properties = new Properties()
      properties.put("driver", "com.mysql.jdbc.Driver")
      properties.put("user", "apptag_rw")
      properties.put("password", "7gyLEVtkER3u8c9")
      properties.put("characterEncoding", "utf8")
      val url = s"jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/${database}"
      spark.read.jdbc(url = url, table = table, properties = properties)
    }

    def has(install: mutable.WrappedArray[Int]): Boolean = {
      var flag = false
      packageIds.foreach(packageId => {
        if (install.toSet.contains(packageId)) {
          flag = true
        }
      })
      flag
    }

    /*
    def mapFun(row: Row): DeviceTag = {
      val interest = row.getAs("interest").asInstanceOf[mutable.WrappedArray[String]]
      val interest_set = new mutable.HashSet[Int]()
      interest.foreach(r => {
        if (bMap.value.keySet.contains(r) && StringUtils.isNotBlank(bMap.value(r))) {
          interest_set.add(bMap.value(r).toInt)
        }
      })

      val frequencySet = new mutable.HashSet[struct]()
      val frequency = row.getAs("frequency").toString
      import scala.collection.JavaConversions._
      val json = GsonUtil.String2JsonObject(frequency)
      json.entrySet().foreach(j => {
        if (StringUtils.isNotBlank(j.getKey) && bMap.value.keySet.contains(j.getKey)) {
          frequencySet.add(struct(bMap.value(j.getKey), j.getValue.getAsInt))

          interest_set.add(bMap.value(j.getKey).toInt)
        }
      })

      DeviceTag(row.getAs("device_id"), row.getAs("age"), row.getAs("gender"), row.getAs("install_apps"),
        interest_set.mkString(","), mutable.WrappedArray.make(frequencySet.toArray))
    }
    */
  }
}

object JoypacUserFeatureJob {
  def main(args: Array[String]): Unit = {
    new JoypacUserFeatureJob().run(args)
  }
}