Commit e4336377 by fan.jiang

fix bug rtdmp_normal

parent 68629eef
...@@ -121,15 +121,24 @@ class RtdmpNormal extends CommonSparkJob with Serializable { ...@@ -121,15 +121,24 @@ class RtdmpNormal extends CommonSparkJob with Serializable {
val package_name: String = array(index)._4 val package_name: String = array(index)._4
val country_code: String = array(index)._5 val country_code: String = array(index)._5
println(inputPath) println(inputPath)
inputDataRdd = inputDataRdd.union(spark.sparkContext.textFile(inputPath).map(row => { val pathUri = new URI(inputPath)
if (row.length == 32) { //过滤后面这种不存在的s3路径 s3://mob-emr-test/dataplatform/rtdmp_request/2021/07/10/dsp_req/com.taobao.idlefish_bes/*/,
DmpDailyDataInformation(row, device_type_md5, platform, package_name, country_code) if (FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
} .exists(new Path(pathUri.toString.replace("*", "")))){
else { inputDataRdd = inputDataRdd.union(spark.sparkContext.textFile(inputPath).map(row => {
DmpDailyDataInformation(row, device_type_not_md5, platform, package_name, country_code) if (row.length == 32) {
} DmpDailyDataInformation(row, device_type_md5, platform, package_name, country_code)
} }
else {
DmpDailyDataInformation(row, device_type_not_md5, platform, package_name, country_code)
}
}
)) ))
}else{
println(inputPath+" not existed!")
inputDataRdd = inputDataRdd.union(spark.sparkContext.emptyRDD[DmpDailyDataInformation])
}
} }
val df: DataFrame = inputDataRdd.toDF().persist(StorageLevel.MEMORY_AND_DISK_SER) val df: DataFrame = inputDataRdd.toDF().persist(StorageLevel.MEMORY_AND_DISK_SER)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment