FixInstallListRuid.scala 4.72 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
package mobvista.dmp.datasource.dm

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}

import java.net.URI
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.dm
 * @author: wangjf
 * @date: 2020-12-14 17:50:36
 * @time: 下午6:42
 * @email: jinfeng.wang@mobvista.com
 */
class FixInstallListRuid extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      printOptions(commandLine)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val business = commandLine.getOptionValue("business")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"FixInstallListRuid.$date.${business}")
    val sc = spark.sparkContext
    val schema =
      StructType(Array(
        StructField("device_id", StringType),
        StructField("ruid", StringType)
      ))

    try {
      spark.udf.register("getInstallList", getInstallList _)

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      val mappingRdd = sc.textFile(input).map(_.split("\t")).filter(_.length == 2)
        .map(r => {
          Row(r(0), r(1))
        })

      val df = spark.createDataFrame(mappingRdd, schema).dropDuplicates

      //  val df = spark.read.orc(input)

      df.createOrReplaceTempView("mapping")

      var sql =
        s"""
           |SELECT mp.ruid device_id, 'ruid' device_type, 'ios' platform, getInstallList(COLLECT_SET(CONCAT(device_type, '\001', install_list))) install_list
           |  FROM dwh.dm_install_list dmp INNER JOIN mapping mp
           |  ON UPPER(dmp.device_id) = UPPER(mp.device_id)
           |  WHERE CONCAT(year, month, day) = '$date' AND business = '$business' AND platform = 'ios'
           |  GROUP BY mp.ruid
           |""".stripMargin

      val ruid = spark.sql(sql).rdd.map(row => {
        (row.getAs[String]("device_id"), MRUtils.JOINER.join(row.getAs[String]("device_type"), row.getAs[String]("platform"),
          row.getAs[String]("install_list")))
      })

      sql =
        s"""
           |SELECT device_id, device_type, platform, install_list
           |  FROM dwh.dm_install_list
           |  WHERE CONCAT(year, month, day) = '$date' AND business = '$business'
           |""".stripMargin

      val other = spark.sql(sql).rdd.map(row => {
        (row.getAs[String]("device_id"), MRUtils.JOINER.join(row.getAs[String]("device_type"), row.getAs[String]("platform"),
          row.getAs[String]("install_list")))
      })

      val rdd = ruid.fullOuterJoin(other).map(tuple => {
        val deviceId = tuple._1
        val valTuple = tuple._2
        val ruidOpt = valTuple._1
        val otherOpt = valTuple._2
        if (ruidOpt.isEmpty && otherOpt.isDefined) {
          MRUtils.JOINER.join(deviceId, otherOpt.get)
        } else {
          MRUtils.JOINER.join(deviceId, ruidOpt.get)
        }
      }).distinct

      /*
      val rdd = ruid.union(other)
        .mapPartitions(rs => {
          rs.map(r => {
            MRUtils.JOINER.join(r._1, r._2)
          })
        })
      */

      rdd.repartition(coalesce)
        .saveAsTextFile(output, classOf[GzipCodec])

    } finally {
      if (spark != null) {
        sc.stop()
        spark.stop()
      }
    }
    0
  }

  def getInstallList(installs: mutable.WrappedArray[String]): String = {
    var install_list: String = ""
    var flag = true
    val iter = installs.iterator
    while (iter.hasNext && flag) {
      val arr = iter.next().split("\001", -1)
      val deviceType = arr(0)
      val installList = arr(1)
      install_list = installList
      if (deviceType.equalsIgnoreCase("idfa")) {
        flag = false
      }
    }
    install_list
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("business", true, "[must] business")
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}

object FixInstallListRuid {
  def main(args: Array[String]): Unit = {
    new FixInstallListRuid().run(args)
  }
}