Commit df772ee5 by WangJinfeng

set spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive=true

parent 3397f569
......@@ -17,6 +17,7 @@ spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMainPre \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=134217728 \
--conf spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive=true \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 8g --executor-cores 5 --num-executors 20 \
../${JAR} -time "${date_time}" -data_utime "${date_time}" -output ${OUTPUT} -coalesce 100
......
......@@ -99,7 +99,7 @@ class RTDmpMainPre extends CommonSparkJob with Serializable {
val pathUri = new URI(list.get(0)._1)
val newAudience = if (FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration)
.exists(new Path(pathUri.toString.replace("*", "")))) {
val rdd = sc.newAPIHadoopFile(list.get(0)._1, fc, kc, vc, sc.hadoopConfiguration)
val rdd = sc.newAPIHadoopFile(list.get(0)._1.replace("*", ""), fc, kc, vc, sc.hadoopConfiguration)
val linesWithFileNames = rdd.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((inputSplit, iterator) => {
val file = inputSplit.asInstanceOf[FileSplit]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment