From e43363776ab9df81df687b6c0a25a1d671094d90 Mon Sep 17 00:00:00 2001
From: fan.jiang <fan.jiang@mobvista.com>
Date: Wed, 13 Oct 2021 15:55:30 +0800
Subject: [PATCH] fix bug rtdmp_normal

---
 src/main/scala/mobvista/dmp/datasource/dm/RtdmpNormal.scala | 25 +++++++++++++++++--------
 1 file changed, 17 insertions(+), 8 deletions(-)

diff --git a/src/main/scala/mobvista/dmp/datasource/dm/RtdmpNormal.scala b/src/main/scala/mobvista/dmp/datasource/dm/RtdmpNormal.scala
index a9d2fbf..323b8f8 100644
--- a/src/main/scala/mobvista/dmp/datasource/dm/RtdmpNormal.scala
+++ b/src/main/scala/mobvista/dmp/datasource/dm/RtdmpNormal.scala
@@ -121,15 +121,24 @@ class RtdmpNormal extends CommonSparkJob with Serializable {
         val package_name: String = array(index)._4
         val country_code: String = array(index)._5
         println(inputPath)
-        inputDataRdd = inputDataRdd.union(spark.sparkContext.textFile(inputPath).map(row => {
-          if (row.length == 32) {
-            DmpDailyDataInformation(row, device_type_md5, platform, package_name, country_code)
-          }
-          else {
-            DmpDailyDataInformation(row, device_type_not_md5, platform, package_name, country_code)
-          }
-        }
+        val pathUri = new URI(inputPath)
+        //过滤后面这种不存在的s3路径  s3://mob-emr-test/dataplatform/rtdmp_request/2021/07/10/dsp_req/com.taobao.idlefish_bes/*/,
+        if (FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
+          .exists(new Path(pathUri.toString.replace("*", "")))){
+            inputDataRdd = inputDataRdd.union(spark.sparkContext.textFile(inputPath).map(row => {
+              if (row.length == 32) {
+                DmpDailyDataInformation(row, device_type_md5, platform, package_name, country_code)
+              }
+              else {
+                DmpDailyDataInformation(row, device_type_not_md5, platform, package_name, country_code)
+              }
+            }
         ))
+        }else{
+            println(inputPath+" not existed!")
+            inputDataRdd = inputDataRdd.union(spark.sparkContext.emptyRDD[DmpDailyDataInformation])
+        }
+
       }
 
       val df: DataFrame = inputDataRdd.toDF().persist(StorageLevel.MEMORY_AND_DISK_SER)
--
libgit2 0.27.1