AdnSdkEtlHour.scala 6.88 KB
package mobvista.dmp.datasource.adn_sdk

import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date

import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import scala.collection.mutable.ArrayBuffer

/**
  * @package: mobvista.dmp.datasource.adn_sdk
  * @author: wangjf
  * @date: 2020/4/2
  * @time: 8:34 下午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class AdnSdkEtlHour extends CommonSparkJob with java.io.Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("hour", true, "hour")
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val hour = commandLine.getOptionValue("hour")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = SparkSession
      .builder()
      .appName(s"AdnSdkEtlHour.$date.$hour")
      .config("spark.rdd.compress", "true")
      .config("spark.shuffle.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.io.compression.lz4.blockSize", "64k")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    try {
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
      val filter_rdd = sc.textFile(input).coalesce(coalesce).filter(_.length != 1).mapPartitions(
        elements => {
          val linesArr = new ArrayBuffer[Row]()
          if (elements.nonEmpty) {
            elements.foreach(p => {
              val etl_json = AdnSdkTools.getEtlJSON(p)
              //  device_id,device_type,platform,data,model,brand
              val device_id = etl_json.getString("device_id")
              val device_type = etl_json.get("device_type")
              val platform = etl_json.get("platform")
              val data = etl_json.get("data")
              val model = etl_json.get("model")
              val brand = etl_json.get("brand")
              val imei = etl_json.getString("imei")
              val android_id = etl_json.getString("android_id")
              if (StringUtils.isNotBlank(imei) && imei.matches(imeiPtn)) {
                linesArr += Row(imei, "imei", "android", data, model, brand)
              }

              if (StringUtils.isNotBlank(android_id) && android_id.matches(andriodIdPtn)) {
                linesArr += Row(android_id, "androidid", "android", data, model, brand)
              }

              if (StringUtils.isNotBlank(device_id) && (device_id.matches(didPtn) && !allZero.equals(device_id) || !device_id.matches(md5Ptn) || device_id.matches(imeiMd5Ptn))) {
                linesArr += Row(device_id, device_type, platform, data, model, brand)
              }
            })
          }
          linesArr.iterator
        })

      val filter_df = filter_rdd.map(contents => {
        parseCalData(contents, date)
      })

      spark.createDataFrame(filter_df, install_schema)
        .repartition(coalesce / 50)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  val install_schema: StructType = StructType(Array(
    StructField("device_id", StringType),
    StructField("device_type", StringType),
    StructField("platform", StringType),
    StructField("model", StringType),
    StructField("brand", StringType),
    StructField("install_list", StringType)))

  def parseCalData(row: Row, log_day: String): Row = {
    val data = row.getString(3)
    val model = row(4)
    val package_array = new JSONArray()
    val device_type = row(1)
    try {
      if (data.indexOf("cal=") > -1) {
        val a = data.substring(data.indexOf("cal="))
        val cal = a.substring(4, a.length()).split("&")(0)
        val receive_day = log_day.substring(0, 4) + "-" + log_day.substring(4, 6) + "-" + log_day.substring(6, 8)
        val install_array = JSON.parseArray(AdnSdkTools.evaluate(cal))
        if (install_array != null) {
          for (a <- 0 until install_array.size()) {
            val obj = install_array.getJSONObject(a)
            val package_name = obj.get("packageName").toString
            var label = ""
            if (obj.get("label") != null) {
              label = obj.get("label").toString
            }
            var campaignid = ""
            var iex = 0
            if (obj.get("iex") != null) {
              iex = obj.getInteger("iex")
            }
            if (obj.get("campaignid") != null) {
              campaignid = obj.get("campaignid").toString
            }
            if (iex == 1) {
              val install_json = new JSONObject()
              if (obj.get("ts") != null && obj.getString("ts").nonEmpty && obj.getString("ts").length() > 4) {
                val ts = obj.getLong("ts")
                val time_format = new SimpleDateFormat("yyyy-MM-dd")
                val date = new Date()
                date.setTime(ts)
                val ins_day = time_format.format(date)
                val n = Integer.parseInt(ins_day.substring(0, 4))
                val f = Integer.parseInt(log_day.substring(0, 4))
                if (f - n > 2) {
                  install_json.put("date", receive_day)
                } else {
                  install_json.put("date", ins_day)
                }

              } else {
                install_json.put("date", receive_day)
              }
              install_json.put("package_name", package_name)
              install_json.put("label", label)
              install_json.put("campaignid", campaignid)
              package_array.add(install_json)
            }
          }
        }
      }
    } catch {
      case _: com.alibaba.fastjson.JSONException => {}
      case e: Exception => {
        e.printStackTrace()
      }
    } finally {

    }
    Row(row(0), device_type, row(2), model, row(5), package_array.toString)
  }
}

object AdnSdkEtlHour {
  def main(args: Array[String]): Unit = {
    new AdnSdkEtlHour().run(args)
  }
}