package mobvista.dmp.demo import java.net.URI import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer /** * Created by fl on 2017/7/13. */ class FindBundleId extends Serializable { val regex = "^[0-9]+$" def run(args: Array[String]): Int = { val conf = new SparkConf().setAppName("Find") val sc = new SparkContext(conf) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(args(1)), true) val data = sc.textFile(args(0)) val filterData = data.filter(doFilter(_)) val resultData = filterData.flatMap(line => { val arrayBuffer = new ArrayBuffer[String](); val srr = StringUtils.splitPreserveAllTokens(line, "\t", -1) val tags = srr(3) val jsonArray = GsonUtil.String2JsonArray(tags) for (element <- jsonArray) { val obj = element.getAsJsonObject if (!obj.get("package_name").getAsString.matches(regex)) { arrayBuffer += obj.get("package_name").getAsString } } arrayBuffer.toArray[String] }) resultData.distinct(50).saveAsTextFile(args(1)) sc.stop() return 0; } def doFilter(line: String): Boolean = { val srr = StringUtils.splitPreserveAllTokens(line, "\t", -1) if ("ios".equals(srr(2))) { val jsonArray = GsonUtil.String2JsonArray(srr(3)) for (element <- jsonArray) { val obj = element.getAsJsonObject if (!obj.get("package_name").getAsString.matches(regex)) { return true } } } return false } } object FindBundleId { def main(args: Array[String]): Unit = { new FindBundleId().run(args) } }