Commit 33095b19 by WangJinfeng

switch spark 3.1

parent ea0579b2
package mobvista.dmp.datasource.joypac
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession, _};
import org.apache.spark.sql._
import org.apache.spark.sql.types._;
/**
* 刘凯 2019-02-18 15:20
......@@ -27,37 +27,22 @@ object JoypcSdkDaily extends Serializable {
.textFile(input_path)
val filter_rdd = log_rdd.filter(_.length != 1).map(p => {
val etl_json = JoypcSdkTools.getEtlJSON(p)
val id = etl_json.get("id");
val idfa = etl_json.get("idfa");
;
val app_version = etl_json.get("app_version");
;
val brand = etl_json.get("brand");
;
val network_type = etl_json.get("network_type");
;
val package_name = etl_json.get("package_name");
;
val platform = etl_json.get("platform");
;
val language = etl_json.get("language");
;
val business_name = etl_json.get("business_name");
;
val apps_info = etl_json.get("apps_info");
;
val business_pass = etl_json.get("business_pass");
;
val os_version = etl_json.get("os_version");
;
val app_version_code = etl_json.get("app_version_code");
;
val model = etl_json.get("model");
;
val time_zone = etl_json.get("time_zone");
;
val time = etl_json.get("time");
;
val id = etl_json.get("id")
val idfa = etl_json.get("idfa")
val app_version = etl_json.get("app_version")
val brand = etl_json.get("brand")
val network_type = etl_json.get("network_type")
val package_name = etl_json.get("package_name")
val platform = etl_json.get("platform")
val language = etl_json.get("language")
val business_name = etl_json.get("business_name")
val apps_info = etl_json.get("apps_info")
val business_pass = etl_json.get("business_pass")
val os_version = etl_json.get("os_version")
val app_version_code = etl_json.get("app_version_code")
val model = etl_json.get("model")
val time_zone = etl_json.get("time_zone")
val time = etl_json.get("time")
Row(
business_name,
business_pass,
......@@ -75,14 +60,12 @@ object JoypcSdkDaily extends Serializable {
time_zone,
apps_info,
time)
}).filter(x => {
val business_name = x.getString(0)
val business_pass = x.getString(1)
val result = business_name.equals("joypac_ios") && business_pass.equals("joypac_ios-sdk0121")
result
})
.filter { x => {
val business_name = x.getString(0)
val business_pass = x.getString(1)
val result = business_name.equals("joypac_ios") && business_pass.equals("joypac_ios-sdk0121")
result
}
}
val cal_rdd = filter_rdd.map { p => parseCalData(p) }
val joypc_schema = StructType(Array(
......@@ -101,24 +84,19 @@ object JoypcSdkDaily extends Serializable {
StructField("apps_info", StringType),
StructField("time", StringType)
))
var joypc_df = spark.createDataFrame(cal_rdd, joypc_schema)
val joypc_df = spark.createDataFrame(cal_rdd, joypc_schema)
if (joypc_df.count() > 1) {
joypc_df.coalesce(100).write.format("orc").mode("overwrite").save(output_path)
} else {
import spark.implicits._
Seq.empty[JoypcSdkDaily].toDF
.coalesce(1).write.format("orc").mode("overwrite").save(output_path)
}
} catch {
case e: Exception =>
e.printStackTrace()
} finally {
spark.stop()
}
spark.sparkContext.stop()
}
def parseCalData(row: Row) = {
Row(
row(2),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment