DspDeviceIdMapping.scala 5.06 KB
package mobvista.dmp.datasource.dsp

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode}

import java.net.URI

/**
 * @package: mobvista.dmp.datasource.dsp
 * @author: wangjf
 * @date: 2021/5/13
 * @time: 1:53 下午
 * @email: jinfeng.wang@mobvista.com
 */

class DspDeviceIdMapping extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"DspDeviceIdMapping.${date}")
    val sc = spark.sparkContext
    val sql =
      s"""
         |SELECT idfa, gaid, exitid, platform FROM dwh.etl_dsp_request_daily_hours
         | WHERE dt = '${date}'
         |""".stripMargin
    val rdd = spark.sql(sql).rdd.mapPartitions(rs => {
      rs.map(r => {
        var idfa = r.getAs[String]("idfa")
        val platform = r.getAs[String]("platform")
        val exitId = r.getAs[String]("exitid")
        //  新增
        var idfv = ""
        if (StringUtils.isNotBlank(exitId)) {
          val devIds = splitFun(exitId, ",")
          if (devIds.length >= 17) {
            if ("ios".equalsIgnoreCase(platform)) {
              if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) {
                idfa = devIds(1)
              }
              if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) {
                idfv = devIds(16)
              }
            }
          }
        }
        if (StringUtils.isNotBlank(idfa) && StringUtils.isNotBlank(idfv)) {
          (idfa, idfv)
        } else {
          ("", "")
        }
      })
      /*
      var gaid = r.getAs[String]("idfa")
      val platform = r.getAs[String]("platform")
      val exitId = r.getAs[String]("exitid")

      //  新增
      var idfv = ""
      var gaidmd5 = ""

      var androidId = ""
      var imei = ""
      var imeimd5 = ""
      var oaid = ""
      var oaidmd5 = ""
      if (StringUtils.isNotBlank(exitId)) {
        val devIds = splitFun(exitId, ",")
        if (devIds.length >= 17) {
          if ("ios".equalsIgnoreCase(platform)) {
            if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) {
              idfa = devIds(1)
            }
            if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) {
              idfv = devIds(16)
            }
            if (StringUtils.isNotBlank(idfa) && StringUtils.isNotBlank(idfv)) {
              (idfa, idfv)
            }
          } else {
            if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) {
              gaid = devIds(0)
            }
            if (StringUtils.isNotBlank(devIds(2)) && devIds(2).matches(MobvistaConstant.md5Ptn)) {
              gaidmd5 = devIds(2)
            }
            if (StringUtils.isNotBlank(devIds(12))) {
              oaid = devIds(12)
            }
            if (StringUtils.isNotBlank(devIds(13)) && devIds(13).matches(MobvistaConstant.md5Ptn)) {
              oaidmd5 = devIds(13)
            }
            if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) {
              imei = devIds(4)
            }
            if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(MobvistaConstant.md5Ptn)) {
              imeimd5 = devIds(5)
            }
            if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) {
              androidId = devIds(7)
            }
          }
        }
      }
      */
    }).filter(r => {
      StringUtils.isNotBlank(r._1) && StringUtils.isNotBlank(r._2)
    }).map(r => {
      Row(r._1, r._2)
    })

    val pathUri = new URI(output)

    FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration).delete(new Path(output), true)

    val df = spark.createDataFrame(rdd, schema)

    df.dropDuplicates()
      .repartition(coalesce)
      .write
      .option("orc.compress", "zlib")
      .mode(SaveMode.Overwrite)
      .orc(output)
    0
  }

  val schema: StructType = StructType(Array(
    StructField("old_id", StringType),
    StructField("new_id", StringType)
  ))
}

object DspDeviceIdMapping {
  def main(args: Array[String]) {
    new DspDeviceIdMapping().run(args)
  }
}