EtlRuidMapping.scala 2.14 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
package mobvista.dmp.datasource.dm

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec

import java.net.URI

/**
 * @package: mobvista.dmp.datasource.dm
 * @author: wangjf
 * @date: 2021/5/6
 * @time: 10:40 上午
 * @email: jinfeng.wang@mobvista.com
 */
class EtlRuidMapping extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      printOptions(commandLine)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession(s"EtlRuidMapping.$date")
    val sc = spark.sparkContext

    try {

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      //  select idfa dev_id,ruid from default.mapping_server_request_log_h where concat(year,month,day) = '${date}' and idfa != 'illegal' and length(idfa) = 36
      val sql =
        s"""
           |select idfv dev_id,ruid from default.mapping_server_request_log_h where concat(year,month,day) = '${date}' and idfv != 'illegal' and length(idfv) = 36
           |""".stripMargin

      val mapping_rdd = spark.sql(sql).dropDuplicates()
        .rdd
        .map(row => {
          MRUtils.JOINER.join(row.getAs[String]("dev_id"), row.getAs[String]("ruid"))
        })

      mapping_rdd.repartition(100)
        .saveAsTextFile(output, classOf[GzipCodec])

    } finally {
      if (spark != null) {
        sc.stop()
        spark.stop()
      }
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("output", true, "[must] output")
    options
  }
}

object EtlRuidMapping {
  def main(args: Array[String]): Unit = {
    new EtlRuidMapping().run(args)
  }
}