IDMappingGraphxResult.scala 4.84 KB
Newer Older
WangJinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
package mobvista.dmp.datasource.id_mapping

import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.MobvistaConstant.{sdf1, sdf2}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

import java.net.URI
import java.text.SimpleDateFormat
import scala.collection.JavaConverters._

/**
 * @package: mobvista.dmp.datasource.id_mapping
 * @author: wangjf
 * @date: 2021/12/7
 * @time: 2:39 下午
 * @email: jinfeng.wang@mobvista.com
 */
class IDMappingGraphxResult extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("country", true, "country")
    options.addOption("platform", true, "platform")
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val country = commandLine.getOptionValue("country")
    val platform = commandLine.getOptionValue("platform")
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphxResult.$date.$country.$platform")

    try {
      oldAndTodayIdMapping(country.toUpperCase, platform, date, spark, output, coalesce)
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String,
                           coalesce: Int) = {
    val dailySQL =
      s"""
         |SELECT * FROM ads.ads_device_id_mapping WHERE dt = '$date' AND source = '${country.toLowerCase}' AND platform = '$platform' AND `type` = 'mid'
         |""".stripMargin
    var idSet: Array[String] = null
    var scoreMap: Map[String, Double] = null
    platform match {
      case "ios" =>
        idSet = iosIDSet
        scoreMap = iosIDScoreMap
      case "android" => {
        scoreMap = androidIDScoreMap
        country match {
          case "CN" =>
            idSet = androidCNIDSet
          case _ =>
            idSet = androidIDSet
        }
      }
      case _ =>
        ""
    }

    val schedule_date = sdf1.format(sdf2.parse(date))

    val resultOneID = spark.sql(dailySQL).rdd.mapPartitions(rs => {
      rs.map(r => {
        val device_id = r.getAs[String]("device_id")
        val device_type = r.getAs[String]("device_type")
        val one_id = MobvistaConstant.String2JSONObject(r.getAs[String]("one_id"))
        val update_date = r.getAs[String]("update_date")
        val keys = one_id.keySet().asScala
        var oneIDScore: OneIDScore = OneIDScore("", "", 0, "")
        keys.foreach(key => {
          val sdf = new SimpleDateFormat("yyyy-MM-dd")
          val json = one_id.getJSONObject(key)
          val id_type = json.getString("one_type")
          val id_type_score = scoreMap(id_type)
          val active_date = json.getString("one_date")
          val cnt = json.getLongValue("one_cnt")
          val days = (sdf.parse(schedule_date).getTime - sdf.parse(active_date).getTime) / 1000 / 3600 / 24 + 1
          val score = id_type_score * 30 / days + 0.1 * cnt
          if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1
            || (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) {
            oneIDScore = OneIDScore(key, id_type, score, active_date)
          }
        })
        val json = new JSONObject()
        json.put("one_id", oneIDScore.one_id)
        json.put("type", oneIDScore.one_type)
        json.put("score", oneIDScore.one_score)
        json.put("version", oneIDScore.one_version)
        Result(device_id, device_type, json.toJSONString, update_date)
      })
    })

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)

    import spark.implicits._
    resultOneID
      .toDF
      .repartition(coalesce)
      .write.mode(SaveMode.Overwrite)
      .option("orc.compress", "zlib")
      .orc(outPutPath)

    spark.sql(
      s"""
         |ALTER TABLE ads.ads_device_id_mapping ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}',platform='$platform',`type`='result')
         | LOCATION '$outPutPath'
         |""".stripMargin)
  }
}

object IDMappingGraphxResult {
  def main(args: Array[String]): Unit = {
    new IDMappingGraphxResult().run(args)
  }
}