Commit 9bc44af8 by fan.jiang

fix bug cn_good_channel

parent ca633a12
......@@ -36,6 +36,16 @@ class CnGoodChannel extends CommonSparkJob with Serializable {
options
}
def buildRes(row:String): Array[Row]={
var data = new ArrayBuffer[Row]()
val length = row.split("\t",-1).length
if (length == 4) {
val package_name=row.split("\t", -1)(3)
data += Row(row.split("\t", -1)(0), row.split("\t", -1)(1), package_name.substring(2,package_name.lastIndexOf("\"")))
}
data.toArray
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
......@@ -81,10 +91,9 @@ class CnGoodChannel extends CommonSparkJob with Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output5), true)
try {
val old_data: RDD[Row] = sc.textFile(old_data_path).map(row => {
val package_name=row.split("\t", -1)(3)
Row(row.split("\t", -1)(0), row.split("\t", -1)(1), package_name.substring(2,package_name.lastIndexOf("\"")))
})
// 存在这种不符合的oiad类型设备,特殊字符^M,导致\t分隔数据异常,所以添加过滤buildRes函数中的条件 fddeee78^Mb6ff-f118-9ddf-ef9ed3ffcac0
val old_data: RDD[Row] = sc.textFile(old_data_path).flatMap(buildRes(_))
val schema: StructType = StructType(Array(
StructField("device_id", StringType),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment