package mobvista.dmp.datasource.tracking_3s

import java.net.URI

import mobvista.dmp.common.{CommonMapReduce, CommonSparkJob}
import mobvista.prd.datasource.util.MRUtils
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

import scala.collection.mutable.ArrayBuffer

object TrackingInstallDaily {

  val idfaRegex = "^[0-9A-F]{8}(-[0-9A-F]{4}){3}-[0-9A-F]{12}$"
  def main(args: Array[String]): Unit = {
    val options = buildOptions()
    val parser = new BasicParser
    val commandLine = parser.parse(options, args)

    //   if (!checkMustOption(commandLine)) {
    //     printUsage(options)
    //     return 1
    //   } else {
    //     printOptions(commandLine)
    //   }

    val campaign_input_path = commandLine.getOptionValue("campaign_input_path")
    val input_path_3s = commandLine.getOptionValue("input_path_3s")
    val output = commandLine.getOptionValue("output")
    val datetime = commandLine.getOptionValue("date")
    val coalesce = commandLine.getOptionValue("coalesce").toInt

    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"TrackingInstallDaily")

    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

    try{
      val campaign_input = sc.textFile(campaign_input_path)
      val input_3s = sc.textFile(input_path_3s)

      val campaign_input_filter: RDD[String] = campaign_input.filter(line => {
        if (line.contains("\t")) {
          val strings: Array[String] = MRUtils.SPLITTER.split(line, -1)
          strings.length == 3 && !StringUtils.isEmpty(strings(2))
        }else{
           false
        }
      })

      val campaign_input_filterMap = campaign_input_filter.map(record => {
        val splits = StringUtils.splitPreserveAllTokens(record, "\t", -1)
        (splits(0), (splits(1),splits(2)))
      }).collectAsMap()
      val campaign_input_filterMapBC: Broadcast[collection.Map[String, (String,String)]] = sc.broadcast(campaign_input_filterMap)

    //  val input_3s_faltmap: RDD[String] = input_3s.filter(!_.contains("\t")).flatMap(line=>{ parseflatMapData(line,campaign_input_filterMapBC,datetime) })  to be same as mobvista.dmp.datasource.tracking.mapreduce.TrackingInstallDailyMR 
      val input_3s_faltmap: RDD[String] = input_3s.flatMap(line=>{ parseflatMapData(line,campaign_input_filterMapBC,datetime) })
      input_3s_faltmap.map(line => {parseMapData(line,campaign_input_filterMapBC)}).coalesce(coalesce).saveAsTextFile(output,classOf[GzipCodec])

    }finally{
      if (sc != null) {
        sc.stop()
      }
    }

  }


  def parseflatMapData(line: String,bmap: Broadcast[collection.Map[String, (String,String)]],date:String): Array[String] = {
    val arrayBuffer = new ArrayBuffer[String]()
    val fields: Array[String] = line.split(",",-1)
    val matchPackage = bmap.value.get(fields(1))
    println("00=="+fields(1))
    if (matchPackage.isDefined) {
      if (fields != null && fields.length >= 29) {
        if (fields(10).matches(CommonMapReduce.didPtn) && !fields(10).equals(CommonMapReduce.allZero)) {
          arrayBuffer += MRUtils.JOINER.join(fields(1), fields(10), fields(3).toUpperCase, date)
        } else if (fields(28).matches(CommonMapReduce.didPtn) && !fields(28).equals(CommonMapReduce.allZero)) {
          arrayBuffer += MRUtils.JOINER.join(fields(1), fields(28), fields(3).toUpperCase, date)
        }
        if (!fields(10).isEmpty && fields(10).matches(CommonMapReduce.andriodIdPtn)) {
          arrayBuffer += MRUtils.JOINER.join(fields(1), fields(10), fields(3).toUpperCase, date)
        }
        if (!fields(11).isEmpty && fields(11).matches(CommonMapReduce.imeiPtn)) {
          arrayBuffer += MRUtils.JOINER.join(fields(1), "imei", fields(11), fields(3).toUpperCase, date)
        }
      }
    }
    arrayBuffer.toArray
  }

  def parseMapData(line: String,bmap: Broadcast[collection.Map[String, (String,String)]]):String ={
    val fields: Array[String] = MRUtils.SPLITTER.split(line,-1)
    if(fields.length==5){
      MRUtils.JOINER.join(fields(2), "imei", "android",  bmap.value.get(fields(0)).get._2, fields(4), fields(3));
    }else{
      val deviceId = fields(1).replaceAll("\"", "").replaceAll("\\[", "")
      val platform = getPlatform(bmap.value.get(fields(0)).get._1, deviceId)
      var deviceType = getDeviceType(platform)
      var packageName=bmap.value.get(fields(0)).get._2
      if ("ios" == platform) packageName = packageName.replace("id", "")
      if (deviceId.matches(CommonMapReduce.andriodIdPtn)) deviceType = "androidid"
      MRUtils.JOINER.join(deviceId, deviceType, platform, packageName, fields(3), fields(2))
    }
  }


  private def getPlatform(platform: String, deviceId: String): String = {
    if (!"ios".equalsIgnoreCase(platform) && !"android".equalsIgnoreCase(platform)) {
      if (deviceId.matches(idfaRegex)) return "ios"
      else return "android"
    }
    platform
  }

  private def getDeviceType(platform: String) = platform match {
    case "ios" =>
      "idfa"
    case "android" =>
      "gaid"
    case "adr" =>
      "gaid"
    case _ =>
      "unknown"
  }

  def buildOptions(): Options = {
    val options = new Options
    options.addOption("campaign_input_path", true, "[must] campaign input path")
    options.addOption("input_path_3s", true, "[must] 3s input path")
    options.addOption("output", true, "[must] output path")
    options.addOption("date", true, "[must] date")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}