DspOrgLogEtlDailys.scala 9.16 KB
package mobvista.dmp.datasource.dsp

import java.net.URI
import java.util

import mobvista.dmp.common.CommonSparkJob
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{collect_set, first, max, udf}
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}

import scala.collection.JavaConversions._
import scala.collection.mutable


class DspOrgLogEtlDailys extends CommonSparkJob with Serializable {


  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)

    val input_etl_dsp_request_hour = commandLine.getOptionValue("input")
    val output_etl_dsp_request_hour = commandLine.getOptionValue("output")
    val input_mds_dsp_request_hour = commandLine.getOptionValue("input_mds_dsp_request_hour")
    val output_mds_dsp_request_hour = commandLine.getOptionValue("output_mds_dsp_request_hour")
    val parallelism = commandLine.getOptionValue("parallelism").toInt
    val coalesce = commandLine.getOptionValue("coalesce").toInt

    val spark = SparkSession
      .builder()
      .config("spark.rdd.compress", "true")
      .config("spark.default.parallelism", s"${parallelism}")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext
    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output_etl_dsp_request_hour),true)
    import spark.implicits._
    try {
      val mergeUdf = udf((segmentArrays: mutable.WrappedArray[String]) => {
        // val segmentMap = Maps.newHashMap();
        var res =""
        val segmentSet: util.Set[String] = new util.HashSet[String]()
        if(segmentArrays != null && segmentArrays.size != 0){
          for (segmentArray <- segmentArrays) {
            // println("mergeUdf:segmentArray1>" + segmentArray.toString )
            val jsonArray = GsonUtil.String2JsonArray(segmentArray.toString)
            for (json <- jsonArray) {  //mergeUdf:segmentArray2>[{"name":"carrier","value":"verified"}];{"name":"carrier","value":"verified"}
              // println("mergeUdf:segmentArray2>" + segmentArray.toString + ";" +json.toString)
              segmentSet.add(json.toString)
            }
          }
        }

        if(!segmentSet.isEmpty){
          res = "[" + segmentSet.mkString(",") + "]"
        }
        res
      })

      val mergePkgUdf = udf((pkgsArrays: mutable.WrappedArray[String]) => {
        var res = "[]"
        val pkgSet: util.Set[String] = new util.HashSet[String]()
        if(pkgsArrays != null && pkgsArrays.size != 0 ){
          for (pkgs <- pkgsArrays) { // 字符串#分隔
            //  println("mergePkgUdf.pkgs:" + pkgs.toString + ";")  mergePkgUdf.pkgs:com.playgendary.tanks#com.nflystudio.InfiniteStaircase;
            if(StringUtils.isNotBlank(pkgs)){
              val pkgsname =pkgs.split("#",-1)
              for(pkgname <- pkgsname){
                pkgSet.add(pkgname)
              }
            }
          }
        }

        if(pkgSet.size()!=0){
          //  res =  new ObjectMapper().writeValueAsString(pkgSet)
          res = "[\"" + pkgSet.mkString("\",\"") + "\"]"
        }
        res
      })

      val mergeAdridIdUdf = udf((adrsArrays: mutable.WrappedArray[String]) => {
        var res = ""
        val adrSet: util.Set[String] = new util.HashSet[String]()
        if(adrsArrays != null && adrsArrays.size != 0){
          for (adrs <- adrsArrays) { // 字符串#分隔,与pgkName逻辑相同
            //  println("mergePkgUdf.pkgs:" + pkgs.toString + ";")  mergePkgUdf.pkgs:com.playgendary.tanks#com.nflystudio.InfiniteStaircase;
            if(StringUtils.isNotBlank(adrs)){
              val adrId =adrs.split("#",-1)
              for(adr <- adrId){
                adrSet.add(adr)
              }
            }
          }
        }
        if(adrSet.size()!=0){
          res = adrSet.mkString(",")
        }
        res
      })

        spark.read.schema(dspEtlSchema).orc(input_etl_dsp_request_hour).map(row =>{
        val device_id = row.getAs[String]("device_id")
        val device_type = row.getAs[String]("device_type")
        val platform = row.getAs[String]("platform")
        val country_code = row.getAs[String]("country_code")
        val ip = row.getAs[String]("ip")
        val gender = row.getAs[String]("gender")
        val birthday = row.getAs[String]("birthday")
        val maker = row.getAs[String]("maker")
        val model = row.getAs[String]("model")
        val os_version = row.getAs[String]("os_version")
        val package_list= row.getSeq[String](10)
        val androidids =  row.getSeq[String](11)
        val datetime =  row.getAs[String]("datetime")
        val segment_ids =  row.getAs[String]("segment_ids")
           val setPkg = new util.HashSet[String]()
           if(package_list.size > 0 ){
             package_list.foreach(line =>{
               if(StringUtils.isNotBlank(line)){
                 setPkg.add(line)
               }
             })
           }

           val setAdrIds = new util.HashSet[String]()
           if(androidids.size > 0 ){
             androidids.foreach(line =>{
               if(StringUtils.isNotBlank(line)){
               setAdrIds.add(line)
               }
             })
           }

        (device_id,device_type,platform,country_code,ip,gender,birthday,maker,model,os_version,setPkg.mkString("#"),setAdrIds.mkString("#"),datetime, segment_ids)

// [{"id":"1462","value":"0.3"},{"id":"381","value":"0.2"},{"id":"937","value":"0.3"},{"id":"41","value":"0.3"}]
      }).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker",
          "model", "os_version", "package_list", "androidids", "datetime", "segment_ids")
          .groupBy("device_id","device_type")
          .agg(first("platform"),
            first("country_code"),
            first("ip"),
            first("gender"),
            first("birthday"),
            first("maker"),
            first("model"),
            first("os_version"),
            mergePkgUdf(collect_set("package_list")),
            mergeAdridIdUdf(collect_set("androidids")),
            max("datetime"),
            mergeUdf(collect_set("segment_ids"))
          ).coalesce(coalesce.toInt)
          .rdd.map(_.mkString("\t")).saveAsTextFile(output_etl_dsp_request_hour,classOf[GzipCodec])

      spark.read.schema(dspMdsSchema).orc(input_mds_dsp_request_hour).createOrReplaceTempView("mds_dsp_request_hour")
      val sql="select device_id,device_type,platform" +
        " ,req_time,ip, geo,longitude,latitude" +
        " from mds_dsp_request_hour group  by device_id,device_type,platform,req_time,ip, geo,longitude,latitude"
      spark.sql(sql).coalesce(coalesce.toInt)
        .rdd.map(_.mkString("\t")).saveAsTextFile(output_mds_dsp_request_hour,classOf[GzipCodec])
      // spark.read.orc(input).map(row =>{row.getList[String](10) +"\t" + row.getList[String](11)}).rdd.saveAsTextFile(output)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  //   *struct<device_id:string,device_type:string,platform:string,req_time:string,ip:string,geo:string,longitude:string,latitude:string>

  def dspMdsSchema: StructType = {
    StructType(StructField("device_id", StringType) ::
      StructField("device_type", StringType) ::
      StructField("platform", StringType) ::
      StructField("req_time", StringType) ::
      StructField("ip", StringType) ::
      StructField("geo", StringType) ::
      StructField("longitude", StringType) ::
      StructField("latitude", StringType)
      :: Nil)
  }


  def dspEtlSchema: StructType = {
    StructType(StructField("device_id", StringType) ::
      StructField("device_type", StringType) ::
      StructField("platform", StringType) ::
      StructField("country_code", StringType) ::
      StructField("ip", StringType) ::
      StructField("gender", StringType) ::
      StructField("birthday", StringType) ::
      StructField("maker", StringType) ::
      StructField("model", StringType) ::
      StructField("os_version", StringType) ::
      StructField("package_list", ArrayType(StringType)) ::
      StructField("androidids", ArrayType(StringType)) ::
      StructField("datetime", StringType) ::
      StructField("segment_ids", StringType)
      :: Nil)
  }


  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("input_mds_dsp_request_hour", true, "input_mds_dsp_request_hour")
    options.addOption("output_mds_dsp_request_hour", true, "output_mds_dsp_request_hour")
    options.addOption("parallelism", true, "parallelism")
    options.addOption("coalesce", true, "coalesce")
    options
  }
}

object DspOrgLogEtlDailys {
  def main(args: Array[String]): Unit = {
    new DspOrgLogEtlDailys().run(args)
  }
}