FixInstallListIdfv.scala 4.84 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
package mobvista.dmp.datasource.dm

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.{StringType, StructField, StructType}

import java.net.URI
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.dm
 * @author: wangjf
 * @date: 2020-12-14 17:50:36
 * @time: 下午6:42
 * @email: jinfeng.wang@mobvista.com
 */
class FixInstallListIdfv extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      printOptions(commandLine)
      return 1
    } else {
      printOptions(commandLine)
    }

    val date = commandLine.getOptionValue("date")
    val business = commandLine.getOptionValue("business")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"FixInstallListIdfv.$date.${business}")
    val sc = spark.sparkContext
    val schema =
      StructType(Array(
        StructField("device_id", StringType),
        StructField("ruid", StringType)
      ))

    try {
      spark.udf.register("getInstallList", getInstallList _)

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
      /*
      val mappingRdd = sc.textFile(input).map(_.split("\t")).filter(_.length == 2)
        .map(r => {
          Row(r(0), r(1))
        })

      val df = spark.createDataFrame(mappingRdd, schema).dropDuplicates
      */
      val df = spark.read.orc(input)
        .dropDuplicates()

      df.createOrReplaceTempView("mapping")

      /*
      //  old dmp
      var sql =
        s"""
           |SELECT mp.new_id device_id, 'idfv' device_type, 'ios' platform, getInstallList(COLLECT_SET(CONCAT(device_type, '\001', install_list))) install_list
           |  FROM dwh.dm_install_list dmp INNER JOIN mapping mp
           |  ON UPPER(dmp.device_id) = UPPER(mp.old_id)
           |  WHERE CONCAT(year, month, day) = '$date' AND business = '$business' AND platform = 'ios'
           |  GROUP BY mp.new_id
           |""".stripMargin
      */

      //  new dmp
      var sql =
        s"""
           |SELECT mp.new_id device_id, 'idfv' device_type, 'ios' platform, country, install_list, ext_data, update_date
           |  FROM dwh.dmp_install_list dmp INNER JOIN mapping mp
           |  ON UPPER(dmp.device_id) = UPPER(mp.old_id)
           |  WHERE dt = '$date' AND business = '$business' AND platform = 'ios'
           |""".stripMargin

      val ruid = spark.sql(sql)

      sql =
        s"""
           |SELECT device_id, device_type, platform, country, install_list, ext_data, update_date
           |  FROM dwh.dmp_install_list
           |  WHERE dt = '$date' AND business = '$business'
           |""".stripMargin

      val other = spark.sql(sql)

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      ruid.union(other)
        .repartition(coalesce)
        .write
        .option("orc.compress", "zlib")
        .mode(SaveMode.Overwrite)
        .orc(output)


      /*
      val rdd = ruid.fullOuterJoin(other).map(tuple => {
        val deviceId = tuple._1
        val valTuple = tuple._2
        val ruidOpt = valTuple._1
        val otherOpt = valTuple._2
        if (ruidOpt.isEmpty && otherOpt.isDefined) {
          MRUtils.JOINER.join(deviceId, otherOpt.get)
        } else {
          MRUtils.JOINER.join(deviceId, ruidOpt.get)
        }
      })

      rdd.repartition(coalesce)
        .saveAsTextFile(output, classOf[GzipCodec])
      */

    } finally {
      if (spark != null) {
        sc.stop()
        spark.stop()
      }
    }
    0
  }

  def getInstallList(installs: mutable.WrappedArray[String]): String = {
    var install_list: String = ""
    var flag = true
    val iter = installs.iterator
    while (iter.hasNext && flag) {
      val arr = iter.next().split("\001", -1)
      val deviceType = arr(0)
      val installList = arr(1)
      install_list = installList
      if (deviceType.equalsIgnoreCase("idfa")) {
        flag = false
      }
    }
    install_list
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("date", true, "[must] date")
    options.addOption("business", true, "[must] business")
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("coalesce", true, "[must] coalesce")
    options
  }
}

object FixInstallListIdfv {
  def main(args: Array[String]): Unit = {
    new FixInstallListIdfv().run(args)
  }
}