package mobvista.dmp.test import java.util.regex.Pattern import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat} import org.apache.spark.rdd.NewHadoopRDD import org.apache.spark.sql.SparkSession class ATest { } object ATest { val dataSplit = "\t" val part = "QUERY" val encode = "UTF-8" val httpPrefix = "http://test.com" val regex = "^id\\d+$" def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .getOrCreate() import spark.implicits._ val sc = spark.sparkContext val rdd = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]("hdfs://ip-172-31-33-205.ec2.internal:8020/user/root/ga_install_repair/*/*/*") .asInstanceOf[NewHadoopRDD[LongWritable, Text]] rdd.mapPartitionsWithInputSplit((split, itr) => { val path = split.asInstanceOf[FileSplit].getPath val pattern = Pattern.compile("\\d{4}/\\d{2}/\\d{2}") val math = pattern.matcher(path.toString) math.find() val date = math.group(0).replace("/", "") itr.map(_._2.toString) .map(_.split("\\|", -1)) .filter(array => { array.length >= 4 && (array(1).equalsIgnoreCase("ios") || array(1).equalsIgnoreCase("android")) }) .map(array => { val platform = array(1) var idType = "" var plt = "" if (platform.equalsIgnoreCase("ios")) { idType = "idfa" plt = "ios" } else if (platform.equalsIgnoreCase("android")) { idType = "gaid" plt = "adr" } val date1 = date.substring(0, 4) + "-" + date.substring(4, 6) + "-" + date.substring(6, 8) (array(0), idType, plt, array(2), array(3), date1) }) }) .toDF("device_id", "device_type", "platform", "package_name", "store_link", "date") .createOrReplaceTempView("tmp_ga_data") val sql = """ |select t.device_id, t.device_type, t.platform, t.package_name, t.store_link, t.date |from ( | select t.device_id, t.device_type, t.platform, t.package_name, t.store_link, t.date, | row_number() over(partition by t.device_id, t.device_type order by t.date desc) as rk | from tmp_ga_data t |) t |where t.rk='1' """.stripMargin spark.sql(sql) .rdd .map(_.mkString("\t")) .saveAsTextFile("s3://mob-emr-test/feng.liang/ga_install_list_all/", classOf[GzipCodec]) } }