JoypcSdkDaily.scala 3.87 KB
package mobvista.dmp.datasource.joypac

import org.apache.spark.sql._
import org.apache.spark.sql.types._;

/**
 * 刘凯      2019-02-18 15:20
 * joypc_sdk fluentd数据接入至etl_joypc_sdk_daily表
 */

case class JoypcSdkDaily(id: String, idfa: String, app_version: String, brand: String, network_type: String, package_name: String, platform: String,
                         language: String, os_version: String, app_version_code: String, model: String, time_zone: String, apps_info: String, time: String)

object JoypcSdkDaily extends Serializable {
  def main(args: Array[String]) {
    val spark = SparkSession.builder()
      .enableHiveSupport()
      .getOrCreate()
    val loadTime = spark.conf.get("spark.app.loadTime")
    var year = loadTime.substring(0, 4)
    var month = loadTime.substring(4, 6)
    val day = loadTime.substring(6, 8)
    val input_path = spark.conf.get("spark.app.input_path").replace("*", "")
    val output_path = spark.conf.get("spark.app.output_path")
    try {
      val log_rdd = spark.sparkContext
        .textFile(input_path)
      val filter_rdd = log_rdd.filter(_.length != 1).map(p => {
        val etl_json = JoypcSdkTools.getEtlJSON(p)
        val id = etl_json.get("id")
        val idfa = etl_json.get("idfa")
        val app_version = etl_json.get("app_version")
        val brand = etl_json.get("brand")
        val network_type = etl_json.get("network_type")
        val package_name = etl_json.get("package_name")
        val platform = etl_json.get("platform")
        val language = etl_json.get("language")
        val business_name = etl_json.get("business_name")
        val apps_info = etl_json.get("apps_info")
        val business_pass = etl_json.get("business_pass")
        val os_version = etl_json.get("os_version")
        val app_version_code = etl_json.get("app_version_code")
        val model = etl_json.get("model")
        val time_zone = etl_json.get("time_zone")
        val time = etl_json.get("time")
        Row(
          business_name,
          business_pass,
          id,
          idfa,
          app_version,
          brand,
          network_type,
          package_name,
          platform,
          language,
          os_version,
          app_version_code,
          model,
          time_zone,
          apps_info,
          time)
      }).filter(x => {
        val business_name = x.getString(0)
        val business_pass = x.getString(1)
        val result = business_name.equals("joypac_ios") && business_pass.equals("joypac_ios-sdk0121")
        result
      })

      val cal_rdd = filter_rdd.map { p => parseCalData(p) }
      val joypc_schema = StructType(Array(
        StructField("id", StringType),
        StructField("idfa", StringType),
        StructField("app_version", StringType),
        StructField("brand", StringType),
        StructField("network_type", StringType),
        StructField("package_name", StringType),
        StructField("platform", StringType),
        StructField("language", StringType),
        StructField("os_version", StringType),
        StructField("app_version_code", StringType),
        StructField("model", StringType),
        StructField("time_zone", StringType),
        StructField("apps_info", StringType),
        StructField("time", StringType)
      ))
      val joypc_df = spark.createDataFrame(cal_rdd, joypc_schema)
      if (joypc_df.count() > 1) {
        joypc_df.coalesce(100).write.format("orc").mode("overwrite").save(output_path)
      } else {
        import spark.implicits._
        Seq.empty[JoypcSdkDaily].toDF
          .coalesce(1).write.format("orc").mode("overwrite").save(output_path)
      }
    } finally {
      spark.stop()
    }
  }

  def parseCalData(row: Row) = {
    Row(
      row(2),
      row(3),
      row(4),
      row(5),
      row(6),
      row(7),
      row(8),
      row(9),
      row(10),
      row(11),
      row(12),
      row(13),
      row(14),
      row(15))
  }
}