1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package mobvista.dmp.demo
import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.lang.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
/**
* Created by fl on 2017/6/22.
*/
class ParquetDemo {
val dataSplit = "\t"
def run(args: Array[String]): Int = {
val conf = new SparkConf().setAppName("ParquetDemo")
.set("spark.default.parallelism", "200")
val sc = new SparkContext(conf)
val data = sc.textFile("s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_install_list/2017/08/09/mp")
val tuple = data.flatMap(doMap(_))
tuple.cache()
val pkgUnique = tuple.map(t => (t._2, 1)).reduceByKey(_+_).map(_._1)
val didUnique = tuple.map(t=> (t._1, 1)).reduceByKey(_+_).map(_._1)
pkgUnique.count()
didUnique.count()
val installData = sc.textFile("s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/dm_install_list/2017/08/09/{3s,adn_install,adn_request_sdk,adserver,dsp_req,ga,other}")
val installTuple = installData.flatMap(doMap(_))
val installPkgUnique = installTuple.map(t => (t._2, 1)).reduceByKey(_+_).map(_._1)
val installDidUnique = installTuple.map(t=> (t._1, 1)).reduceByKey(_+_).map(_._1)
pkgUnique.subtract(installPkgUnique).count()
didUnique.subtract(installDidUnique).count()
0
}
def doMap (record: String): Array[Tuple2[String, String]] = {
val buffer = new ArrayBuffer[Tuple2[String, String]] ()
val splits = StringUtils.splitPreserveAllTokens (record, dataSplit, - 1)
val device_id = splits (0)
val device_type = splits (1)
val list = splits (3)
val jsonArray = GsonUtil.String2JsonArray (list)
jsonArray.foreach (element => {
val obj = element.getAsJsonObject
buffer += Tuple2 (s"$device_id$dataSplit$device_type", obj.get ("package_name").getAsString)
})
buffer.toArray
}
}
object ParquetDemo {
def main(args: Array[String]): Unit = {
val in = new FileInputStream("/Users/fl/Downloads/ios.txt")
val reader = new InputStreamReader(in)
val buffer = new BufferedReader(reader)
var line = buffer.readLine()
var count = 1
while (line != null) {
if (line.split("\t", -1).length < 4) {
println(count)
println(line)
}
count += 1
line = buffer.readLine()
}
}
}