EtlDealidDaily.scala 4.34 KB
package mobvista.dmp.datasource.taobao

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

import scala.collection.mutable.ArrayBuffer


class EtlDealidDaily extends CommonSparkJob with Serializable {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("oppooutput", true, "[must] oppooutput")
    options.addOption("inmobioutput", true, "[must] inmobioutput")
    options.addOption("dt_dash_today", true, "[must] dt_dash_today")
    options
  }

  def buildRes(row: Row, date: String,packageName: String): Array[String] = {
    val buffer = new ArrayBuffer[String]()
    val os = row.getAs[String]("os")
    val ext5 = row.getAs[String]("ext5")
    if (os == "ios") {
      val idfa = ext5.split(",")(1)
      val idfamd5 = ext5.split(",")(2)
      if (!idfa.isEmpty) {
        buffer += idfa + "\t" + "idfa" + "\t" + "ios" + "\t" + packageName + "\t" + date
      }
      if (!idfamd5.isEmpty) {
        buffer += idfamd5 + "\t" + "idfamd5" + "\t" + "ios" + "\t" +packageName + "\t" + date
      }
    } else if (os == "android") {
      val gaid = ext5.split(",")(0)
      val imei = ext5.split(",")(4)
      val imeimd5 = ext5.split(",")(5)
      if (!gaid.isEmpty) {
        buffer += gaid + "\t" + "gaid" + "\t" + "android" + "\t" + packageName + "\t" + date
      }
      if (!imei.isEmpty) {
        buffer += imei + "\t" + "imei" + "\t" + "android" + "\t" + packageName + "\t" + date
      }
      if (!imeimd5.isEmpty) {
        buffer += imeimd5 + "\t" + "imeimd5" + "\t" + "android" + "\t" + packageName + "\t" + date
      }
    }
    buffer.toArray
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val oppooutput = commandLine.getOptionValue("oppooutput")
    val inmobioutput = commandLine.getOptionValue("inmobioutput")
    val dt_dash_today = commandLine.getOptionValue("dt_dash_today")

    val spark = SparkSession.builder()
      .appName("EtlAliActivitionDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(oppooutput), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(inmobioutput), true)

    val year: String = dt_dash_today.split("-")(0)
    val month: String = dt_dash_today.split("-")(1)
    val date: String = dt_dash_today.split("-")(2)
    val hour: String = dt_dash_today.split("-")(3)
    val update: String = dt_dash_today.substring(0, dt_dash_today.lastIndexOf("-"))

    try {
      val sql01: String =
        s"""
           |select os,ext5
           |from adn_dsp.log_adn_dsp_request_orc_hour
           |where yr='${year}' and mt='${month}' and dt='${date}' and hh='${hour}'
           |and (get_json_object(ext3,"$$.dealids") like "%2532%" or  get_json_object(ext3,"$$.dealids") like "%2533%") and  exchanges='oppocn'
           |""".stripMargin

      spark.sql(sql01).rdd.flatMap(buildRes(_, update,"com.taobao.taobao_oppo")).coalesce(300).saveAsTextFile(oppooutput, classOf[GzipCodec])

      val sql02: String =
        s"""
           |select os,ext5
           |from adn_dsp.log_adn_dsp_request_orc_hour
           |where yr='${year}' and mt='${month}' and dt='${date}' and hh='${hour}'
           |and get_json_object(ext3,"$$.dealids") like "%1594807676568%"  and exchanges='inmobi'
           |""".stripMargin
      spark.sql(sql02).rdd.flatMap(buildRes(_, update,"com.taobao.taobao_inmobi")).coalesce(300).saveAsTextFile(inmobioutput, classOf[GzipCodec])

    } finally {
      spark.stop()
    }
    0

  }
}


object EtlDealidDaily {
  def main(args: Array[String]): Unit = {
    new EtlDealidDaily().run(args)
  }
}