package mobvista.dmp.test

import java.util.regex.Pattern

import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.sql.SparkSession
class ATest {

}

object ATest {
  val dataSplit = "\t"
  val part = "QUERY"
  val encode = "UTF-8"
  val httpPrefix = "http://test.com"
  val regex = "^id\\d+$"

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .getOrCreate()
    import spark.implicits._

    val sc = spark.sparkContext
    val rdd = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]("hdfs://ip-172-31-33-205.ec2.internal:8020/user/root/ga_install_repair/*/*/*")
      .asInstanceOf[NewHadoopRDD[LongWritable, Text]]

    rdd.mapPartitionsWithInputSplit((split, itr) => {
      val path = split.asInstanceOf[FileSplit].getPath
      val pattern = Pattern.compile("\\d{4}/\\d{2}/\\d{2}")
      val math = pattern.matcher(path.toString)
      math.find()
      val date = math.group(0).replace("/", "")

      itr.map(_._2.toString)
        .map(_.split("\\|", -1))
        .filter(array => {
          array.length >= 4 && (array(1).equalsIgnoreCase("ios") || array(1).equalsIgnoreCase("android"))
        })
        .map(array => {
          val platform = array(1)
          var idType = ""
          var plt = ""

          if (platform.equalsIgnoreCase("ios")) {
            idType = "idfa"
            plt = "ios"
          } else if (platform.equalsIgnoreCase("android")) {
            idType = "gaid"
            plt = "adr"
          }
          val date1 = date.substring(0, 4) + "-" + date.substring(4, 6) + "-" + date.substring(6, 8)
          (array(0), idType, plt, array(2), array(3), date1)
        })
    })
      .toDF("device_id", "device_type", "platform", "package_name", "store_link", "date")
      .createOrReplaceTempView("tmp_ga_data")

    val sql =
      """
        |select t.device_id, t.device_type, t.platform, t.package_name, t.store_link, t.date
        |from (
        |  select t.device_id, t.device_type, t.platform, t.package_name, t.store_link, t.date,
        |   row_number() over(partition by t.device_id, t.device_type order by t.date desc) as rk
        |  from tmp_ga_data t
        |) t
        |where t.rk='1'
      """.stripMargin
    spark.sql(sql)
      .rdd
      .map(_.mkString("\t"))
      .saveAsTextFile("s3://mob-emr-test/feng.liang/ga_install_list_all/", classOf[GzipCodec])


  }

}