package mobvista.dmp.datasource.retargeting

import java.net.URL

import com.datastax.spark.connector._
import mobvista.dmp.util.{MD5Util, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.mutable.ArrayBuffer

class RetargetingCassandra extends Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("region", true, "region")
    options
  }

  private def run(args: Array[String]) {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val region = commandLine.getOptionValue("region")

    val spark = SparkSession
      .builder()
      .appName(s"RetargetingCassandra.${region.toUpperCase}")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", region + "_host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.${region.toUpperCase}Factory")
      .config("spark.cassandra.connection.connections_per_executor_max", "16")
      .config("spark.cassandra.output.concurrent.writes", "2048")
      .config("spark.cassandra.concurrent.reads", "2048")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "2048")
      .config("spark.cassandra.connection.keep_alive_ms", "600000")
      .enableHiveSupport()
      .getOrCreate()

    try {

      /*
      val sdf = new SimpleDateFormat("yyyyMMdd")

      val update_date = DateUtil.format(sdf.parse(date), "yyyy-MM-dd")
      var sql = Constant.user_feature_sql.replace("@date", date).replace("@update_date", update_date)

      if (region.toUpperCase.equals("CN")) {
        sql = sql + " AND UPPER(country) = 'CN'"
      }

      val rdd = spark.sql(sql)

      val keyspace = "dmp_realtime_service"
      val table = "dmp_user_features"
      val columns = SomeColumns("device_id", "age", "gender", "interest", "install_apps", "frequency" overwrite)
      rdd.rdd.saveToCassandra(keyspace, table, columns)
      */

      val mongoDB = spark.read.format("com.mongodb.spark.sql")
        .option("uri", "mongodb://internal-beijing-adServerMongo-virginia-1390640500.us-east-1.elb.amazonaws.com/new_adn.campaign_device_id")
        .option("collection", "campaign_device_id")
        .load()
        .rdd.filter(r => {
        val deviceIdUrl = r.getAs("deviceIdUrl").toString
        !deviceIdUrl.endsWith(".txt")
      })

      val map = mongoDB.map(r => {
        val deviceIdUrl = r.getAs("deviceIdUrl").toString
        val campaignId = Integer.parseInt(r.getAs("campaignId").toString)
        val url = new URL(deviceIdUrl)
        val s3_path = String.format("s3:/%s", new URL(url.openConnection.getHeaderField("location").split("\\?")(0)).getPath)
        //  val status = r.getAs("status")
        (campaignId, (s3_path, 1))
      }).collectAsMap()

      val sc = spark.sparkContext
      var rdd = sc.emptyRDD[(String, Int)]
      map.foreach(m => {
        rdd = if (m._2._2 == 1) {
          rdd.union(sc.textFile(m._2._1).map(r => {
            (r, m._1)
          }))
        } else {
          rdd.union(sc.textFile(m._2._1).map(r => {
            (r, 0)
          }))
        }

        /*
        rdd = if (m._2._2 == 1) {
          rdd.union(sc.textFile(m._2._1).map(r => {
            val array = new ArrayBuffer[(String, Int)]()
            if (StringUtils.isNotBlank(r) && (r.length == 32 || r.length == 31 || r.length == 30)) {
              array += ((r, m._1))
            } else {
              array += ((MD5Util.getMD5Str(r.toLowerCase()), m._1))
              array += ((MD5Util.getMD5Str(r.toUpperCase()), m._1))
            }
            array
          }).flatMap(l => l))
        } else {
          rdd.union(sc.textFile(m._2._1).map(r => {
            val array = new ArrayBuffer[(String, Int)]()
            if (StringUtils.isNotBlank(r) && (r.length == 32 || r.length == 31 || r.length == 30)) {
              array += ((r, 0))
            } else {
              array += ((MD5Util.getMD5Str(r.toLowerCase()), 0))
              array += ((MD5Util.getMD5Str(r.toUpperCase()), 0))
            }
            array
          }).flatMap(l => l))
        }
        */
      })

      //  import spark.implicits._
      val df = rdd.combineByKey(
        (v: Int) => Iterable(v),
        (c: Iterable[Int], v: Int) => c ++ Seq(v),
        (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2
      ).map(r => {
        val array = new ArrayBuffer[DeviceTarget]()
        if (r._1.length == 32 || r._1.length == 31 || r._1.length == 30) {
          array += DeviceTarget(r._1, r._2.toSet[Int])
        } else {
          array += DeviceTarget(MD5Util.getMD5Str(r._1.toLowerCase()), r._2.toSet[Int])
          array += DeviceTarget(MD5Util.getMD5Str(r._1.toUpperCase()), r._2.toSet[Int])
        }
        array
        /*
        Row(r._1, r._2.toSet[Int])
        */
      }).flatMap(l => l)

      df.saveToCassandra("dmp_realtime_service", "dmp_user_retarget", SomeColumns("device_id", "target_campaign_list" overwrite))
      /*
      val df = rdd.combineByKey(
        (v: Int) => Iterable(v),
        (c: Iterable[Int], v: Int) => c ++ Seq(v),
        (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2
      ).map(r => {
        DeviceTarget(r._1.toLowerCase, r._2.toSet[Int])
      }).toDF
      df.createOrReplaceTempView("device_campaign")
      val sql = "SELECT * FROM device_campaign"
      spark.sql(sql).rdd.saveToCassandra("dmp_realtime_service", "dmp_user_retarget", SomeColumns("device_id", "target_campaign_list" overwrite))
      */
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
  }
}

object RetargetingCassandra {
  def main(args: Array[String]): Unit = {
    new RetargetingCassandra().run(args)
  }
}