Commit 1076083f by WangJinfeng

fix no file on joypac logs

parent c08cf707
......@@ -25,6 +25,7 @@ spark-submit --class mobvista.dmp.datasource.joypac.JoypacResultEtl \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.network.timeout=720s \
--conf spark.default.parallelism=20 \
--conf spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive=true \
--master yarn --deploy-mode cluster --name JoypacResutlEtl \
--executor-memory 4g --driver-memory 4g --executor-cores 2 --num-executors 5 \
--files ${HIVE_SITE_PATH} \
......
package mobvista.dmp.datasource.joypac
import java.net.URI
import com.google.gson.JsonObject
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.dm.Constant.{allZero, andriodIdPtn, didPtn, imeiPtn}
......@@ -14,6 +12,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import ru.yandex.clickhouse.ClickHouseDataSource
import java.net.URI
/**
* @package: mobvista.dmp.datasource.joypac
* @author: wangjf
......@@ -123,12 +123,20 @@ class JoypacResultEtl extends CommonSparkJob {
j != null
})
df.toDF
.dropDuplicates()
.coalesce(Integer.parseInt(coalesce)).write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
if (df.count() > 1) {
df.toDF
.dropDuplicates()
.coalesce(Integer.parseInt(coalesce)).write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} else {
Seq.empty[JoypacEntity].toDF
.coalesce(1).write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
}
import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._
val clusterName = Some(cluster): Option[String]
......
......@@ -7,6 +7,10 @@ import org.apache.spark.sql.{SparkSession, _};
* 刘凯 2019-02-18 15:20
* joypc_sdk fluentd数据接入至etl_joypc_sdk_daily表
*/
case class JoypcSdkDaily(id: String, idfa: String, app_version: String, brand: String, network_type: String, package_name: String, platform: String,
language: String, os_version: String, app_version_code: String, model: String, time_zone: String, apps_info: String, time: String)
object JoypcSdkDaily extends Serializable {
def main(args: Array[String]) {
val spark = SparkSession.builder()
......@@ -16,7 +20,7 @@ object JoypcSdkDaily extends Serializable {
var year = loadTime.substring(0, 4)
var month = loadTime.substring(4, 6)
val day = loadTime.substring(6, 8)
val input_path = spark.conf.get("spark.app.input_path")
val input_path = spark.conf.get("spark.app.input_path").replace("*", "")
val output_path = spark.conf.get("spark.app.output_path")
try {
val log_rdd = spark.sparkContext
......@@ -98,7 +102,14 @@ object JoypcSdkDaily extends Serializable {
StructField("time", StringType)
))
var joypc_df = spark.createDataFrame(cal_rdd, joypc_schema)
joypc_df.coalesce(100).write.format("orc").mode("overwrite").save(output_path)
if (joypc_df.count() > 1) {
joypc_df.coalesce(100).write.format("orc").mode("overwrite").save(output_path)
} else {
import spark.implicits._
Seq.empty[JoypcSdkDaily].toDF
.coalesce(1).write.format("orc").mode("overwrite").save(output_path)
}
} catch {
case e: Exception =>
e.printStackTrace()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment