IDMappingGraphx.scala 13.4 KB
Newer Older
WangJinfeng committed
1 2 3
package mobvista.dmp.datasource.id_mapping

import com.alibaba.fastjson.JSONObject
WangJinfeng committed
4
import mobvista.dmp.common.MobvistaConstant.sdf1
WangJinfeng committed
5 6
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._
WangJinfeng committed
7
import mobvista.dmp.util.MD5Util
WangJinfeng committed
8 9 10 11
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.types.StructType
WangJinfeng committed
12 13
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
WangJinfeng committed
14 15

import java.net.URI
WangJinfeng committed
16
import scala.collection.JavaConverters._
WangJinfeng committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
import scala.collection.mutable.ArrayBuffer

/**
 * @package: mobvista.dmp.datasource.id_mapping
 * @author: wangjf
 * @date: 2021/12/7
 * @time: 2:39 下午
 * @email: jinfeng.wang@mobvista.com
 */
class IDMappingGraphx extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("country", true, "country")
    options.addOption("platform", true, "platform")
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
WangJinfeng committed
34
    options.addOption("result_output", true, "result_output")
WangJinfeng committed
35 36 37 38 39 40 41 42 43 44 45
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val country = commandLine.getOptionValue("country")
    val platform = commandLine.getOptionValue("platform")
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
WangJinfeng committed
46
    val result_output = commandLine.getOptionValue("result_output")
WangJinfeng committed
47 48 49 50 51
    val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))

    val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform")

    try {
WangJinfeng committed
52
      oldAndTodayIdMapping(country, platform, date, spark, output, result_output, coalesce)
WangJinfeng committed
53 54 55 56 57 58 59 60 61 62
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }


  def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String,
WangJinfeng committed
63
                           resultOutPutPath: String, coalesce: Int) = {
WangJinfeng committed
64 65 66 67 68

    implicit val formats = org.json4s.DefaultFormats

    var dailySQL = ""
    var schame: StructType = null
WangJinfeng committed
69
    var idSet: Array[String] = null
WangJinfeng committed
70
    var idMainSet: Set[String] = null
WangJinfeng committed
71
    var scoreMap: Map[String, Double] = null
WangJinfeng committed
72 73 74 75 76 77 78
    //  1.今日数据加载
    platform match {
      case "ios" =>
        dailySQL = Constant.ios_id_mapping_sql.replace("@date", date)
        schame = iosVertSchema
        idSet = iosIDSet
        idMainSet = iosMainIDSet
WangJinfeng committed
79
        scoreMap = iosIDScoreMap
WangJinfeng committed
80 81 82
      case "android" => {
        schame = adrVertSchema
        idMainSet = androidMainIDSet
WangJinfeng committed
83
        scoreMap = androidIDScoreMap
WangJinfeng committed
84 85 86 87 88 89 90 91 92 93 94 95
        country match {
          case "CN" =>
            idSet = androidCNIDSet
            dailySQL = Constant.android_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country = '${country}'")
          case _ =>
            idSet = androidIDSet
            dailySQL = Constant.android_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country != '${country}'")
        }
      }
      case _ =>
        ""
    }
WangJinfeng committed
96
    val todayDF = spark.createDataFrame(spark.sql(dailySQL).rdd.map(row => {
WangJinfeng committed
97 98 99 100
      processData(row, platform)
    }), schema = schame)

    val vertex = todayDF.rdd.map(row => {
WangJinfeng committed
101
      processVertex(date, row, idSet, idMainSet)
WangJinfeng committed
102 103
    }).flatMap(l => l)

WangJinfeng committed
104
    val maxGraph = vertex.combineByKey(
WangJinfeng committed
105 106 107
      (v: (String, String, String)) => Set(v),
      (c: Set[(String, String, String)], v: (String, String, String)) => c ++ Seq(v),
      (c1: Set[(String, String, String)], c2: Set[(String, String, String)]) => c1 ++ c2
WangJinfeng committed
108 109 110 111 112 113
    )

    //  非主ID生成OneID
    val multiOneIDRDD = maxGraph.filter(kv => {
      kv._2.size > 1
    }).map(rs => {
WangJinfeng committed
114 115 116 117 118 119 120 121
      platform match {
        case "ios" =>
          updateOneID(rs, Constant.iosMainIDSet)
        case _ =>
          updateOneID(rs, Constant.androidMainIDSet)
      }
    }).flatMap(l => l)

WangJinfeng committed
122
    //  主ID生成OneID
WangJinfeng committed
123 124
    val singleOneIDRDD = maxGraph.filter(kv => {
      kv._2.size == 1
WangJinfeng committed
125 126 127
    }).map(kv => {
      val oneID = new JSONObject()
      val srcID = kv._1
WangJinfeng committed
128
      var idType = ""
WangJinfeng committed
129
      kv._2.foreach(it => {
WangJinfeng committed
130
        idType = it._3
WangJinfeng committed
131
        oneID.put(it._1, MobvistaConstant.String2JSONObject(it._2))
WangJinfeng committed
132
      })
WangJinfeng committed
133 134 135
      ((srcID, idType), oneID.toJSONString)
    })

WangJinfeng committed
136
    val midMergeOneIDRDD = multiOneIDRDD.union(singleOneIDRDD).combineByKey(
WangJinfeng committed
137 138 139 140
      (v: String) => Set(v),
      (c: Set[String], v: String) => c ++ Seq(v),
      (c1: Set[String], c2: Set[String]) => c1 ++ c2
    ).map(kv => {
WangJinfeng committed
141 142 143 144 145
      val srcId = if (kv._1._1.matches(MobvistaConstant.md5Ptn)) {
        kv._1._1
      } else {
        MD5Util.getMD5Str(kv._1._1)
      }
WangJinfeng committed
146
      val srcType = kv._1._2
WangJinfeng committed
147
      val oneIDJSON = new JSONObject()
WangJinfeng committed
148 149 150 151
      kv._2.foreach(js => {
        val json = MobvistaConstant.String2JSONObject(js)
        val keys = json.keySet().asScala
        keys.foreach(key => {
WangJinfeng committed
152 153 154 155 156 157
          val oneID = if (key.matches(MobvistaConstant.md5Ptn)) {
            key
          } else {
            MD5Util.getMD5Str(key)
          }
          oneIDJSON.put(oneID, json.getJSONObject(key))
WangJinfeng committed
158 159
        })
      })
WangJinfeng committed
160 161
      Result(srcId, srcType, oneIDJSON.toJSONString)
    }).persist(StorageLevel.MEMORY_AND_DISK_SER)
WangJinfeng committed
162 163 164

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)

WangJinfeng committed
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
    import spark.implicits._
    midMergeOneIDRDD.toDF
      .repartition(coalesce)
      .write.mode(SaveMode.Overwrite)
      .option("orc.compress", "zlib")
      .orc(outPutPath)

    val resultOneID = midMergeOneIDRDD.mapPartitions(rs => {
      rs.map(r => {
        val device_id = r.device_id
        val device_type = r.device_type
        val one_id = MobvistaConstant.String2JSONObject(r.one_id)
        val keys = one_id.keySet().asScala
        var oneIDScore: OneIDScore = OneIDScore("", "", 0)
        keys.foreach(key => {
WangJinfeng committed
180 181
          val json = one_id.getJSONObject(key)
          val id_type = json.getString("id_type")
WangJinfeng committed
182
          val id_type_score = scoreMap(id_type)
WangJinfeng committed
183 184
          val active_date = json.getString("active_date")
          val cnt = json.getIntValue("cnt")
WangJinfeng committed
185 186 187 188 189 190 191 192 193 194 195 196 197 198
          val days = (sdf1.parse(date).getTime - sdf1.parse(active_date).getTime) / 1000 / 3600 / 24 + 1
          val score = id_type_score * 30 / days + 0.1 * cnt
          if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1
            || (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) {
            oneIDScore = OneIDScore(key, id_type, score)
          }
        })
        val json = new JSONObject()
        json.put("one_id", oneIDScore.one_id)
        json.put("one_type", oneIDScore.one_type)
        json.put("one_score", oneIDScore.one_score)
        Result(device_id = device_id, device_type = device_type, one_id = json.toJSONString)
      })
    })
WangJinfeng committed
199 200 201

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(resultOutPutPath), true)

WangJinfeng committed
202 203
    resultOneID
      .toDF
WangJinfeng committed
204
      .repartition(coalesce)
WangJinfeng committed
205 206 207
      .write.mode(SaveMode.Overwrite)
      .option("orc.compress", "zlib")
      .orc(resultOutPutPath)
WangJinfeng committed
208 209 210 211 212 213 214
  }


  def processData(row: Row, platform: String): Row = {
    platform match {
      case "ios" =>
        var idfa = row.getAs[String]("idfa")
WangJinfeng committed
215
        idfa = if (StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn) && !idfa.matches(MobvistaConstant.allZero)) {
WangJinfeng committed
216 217 218 219 220
          idfa
        } else {
          ""
        }
        var idfv = row.getAs[String]("idfv")
WangJinfeng committed
221
        idfv = if (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn) && !idfv.matches(MobvistaConstant.allZero)) {
WangJinfeng committed
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
          idfv
        } else {
          ""
        }
        val pkg_name = row.getAs[String]("pkg_name")
        val sysid = row.getAs[String]("sysid")
        val bkupid = row.getAs[String]("bkupid")
        val xwho = row.getAs[String]("xwho")
        val user_id = row.getAs[String]("user_id")
        val country = row.getAs[String]("country")
        val ip = row.getAs[String]("ip")
        val ua = row.getAs[String]("ua")
        val brand = row.getAs[String]("brand")
        val model = row.getAs[String]("model")
        val os_version = row.getAs[String]("os_version")
        val osv_upt = row.getAs[String]("osv_upt")
        val upt = row.getAs[String]("upt")
        val cnt = row.getAs[Long]("cnt")
WangJinfeng committed
240
        val idfv_bundle = if (StringUtils.isNotBlank(idfv)) {
WangJinfeng committed
241
          MD5Util.getMD5Str(idfv + pkg_name)
WangJinfeng committed
242 243 244 245
        } else {
          ""
        }
        val bmosv_osv_upt = if (StringUtils.isNotBlank(osv_upt)) {
WangJinfeng committed
246
          MD5Util.getMD5Str(brand + model + os_version + osv_upt)
WangJinfeng committed
247 248 249 250
        } else {
          ""
        }
        val bmosv_upt = if (StringUtils.isNotBlank(upt)) {
WangJinfeng committed
251
          MD5Util.getMD5Str(brand + model + os_version + upt)
WangJinfeng committed
252 253 254
        } else {
          ""
        }
WangJinfeng committed
255
        val bmosv_ipua_bundle = if (StringUtils.isNotBlank(ip)) {
WangJinfeng committed
256
          MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name)
WangJinfeng committed
257 258 259 260
        } else {
          ""
        }
        //  IosVert(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
WangJinfeng committed
261
        Row(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
WangJinfeng committed
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
      case "android" =>
        val imei = row.getAs[String]("imei")
        val android_id = row.getAs[String]("android_id")
        val pkg_name = row.getAs[String]("pkg_name")
        val oaid = row.getAs[String]("oaid")
        val gaid = row.getAs[String]("gaid")
        val sysid = row.getAs[String]("sysid")
        val bkupid = row.getAs[String]("bkupid")
        val xwho = row.getAs[String]("xwho")
        val user_id = row.getAs[String]("user_id")
        val country = row.getAs[String]("country")
        val ip = row.getAs[String]("ip")
        val ua = row.getAs[String]("ua")
        val brand = row.getAs[String]("brand")
        val model = row.getAs[String]("model")
        val os_version = row.getAs[String]("os_version")
        val upt = row.getAs[String]("upt")
        val cnt = row.getAs[Long]("cnt")
WangJinfeng committed
280
        val android_pkg = if (StringUtils.isNotBlank(android_id)) {
WangJinfeng committed
281
          MD5Util.getMD5Str(android_id + pkg_name)
WangJinfeng committed
282 283 284 285
        } else {
          ""
        }
        val bmosv_upt = if (StringUtils.isNotBlank(upt)) {
WangJinfeng committed
286
          MD5Util.getMD5Str(brand + model + os_version + upt)
WangJinfeng committed
287 288 289
        } else {
          ""
        }
WangJinfeng committed
290
        val bmosv_ipua_pkg = if (StringUtils.isNotBlank(ip)) {
WangJinfeng committed
291
          MD5Util.getMD5Str(brand + model + os_version + ip + ua + pkg_name)
WangJinfeng committed
292 293 294 295 296
        } else {
          ""
        }
        //  AdrVert(imei, gaid, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
        if ("CN".equalsIgnoreCase(country)) {
WangJinfeng committed
297
          Row(imei, oaid, gaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
WangJinfeng committed
298
        } else {
WangJinfeng committed
299
          Row(gaid, imei, oaid, sysid, android_pkg, bmosv_upt, bmosv_ipua_pkg, xwho, user_id, bkupid, cnt)
WangJinfeng committed
300 301 302 303 304 305
        }
      case _ =>
        Row("")
    }
  }

WangJinfeng committed
306 307
  def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, String, String))] = {
    val array = new ArrayBuffer[(String, (String, String, String))]()
WangJinfeng committed
308 309 310 311 312 313 314 315 316 317 318 319 320 321
    implicit val formats = org.json4s.DefaultFormats
    //  val json = JSON.parseObject(Serialization.write(row))
    //  事件频次
    val cnt = row.getAs[Long]("cnt")
    //  date 活跃日期,用于计算权重
    var flag = true
    for (i <- 0 to ids.length - 2) {
      if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(i)))) && flag) {
        val jsonObject = new JSONObject()
        val oneIDType = ids(i)
        jsonObject.put("id_type", oneIDType)
        jsonObject.put("active_type", date)
        jsonObject.put("cnt", cnt)
        val oneID = row.getAs[String](String.valueOf(ids(i)))
WangJinfeng committed
322
        array += ((oneID, (oneID, jsonObject.toJSONString, oneIDType)))
WangJinfeng committed
323
        for (j <- i + 1 until ids.length) {
WangJinfeng committed
324
          if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) {
WangJinfeng committed
325 326
            val srcType = ids(j)
            val srcOrg = row.getAs[String](String.valueOf(srcType))
WangJinfeng committed
327
            if (mainIDSet.contains(oneIDType)) {
WangJinfeng committed
328
              array += ((srcOrg, (oneID, jsonObject.toJSONString, srcType)))
WangJinfeng committed
329
            } else {
WangJinfeng committed
330
              array += ((oneID, (srcOrg, jsonObject.toJSONString, srcType)))
WangJinfeng committed
331
            }
WangJinfeng committed
332 333 334 335 336 337 338 339
          }
        }
        flag = false
      }
    }
    array
  }

WangJinfeng committed
340 341
  def updateOneID(kv: (String, Set[(String, String, String)]), mainIDSet: Set[String]): ArrayBuffer[((String, String), String)] = {
    val array = new ArrayBuffer[((String, String), String)]()
WangJinfeng committed
342
    val tmpOneId = kv._1
WangJinfeng committed
343 344
    val iters = kv._2
    val oneID = new JSONObject()
WangJinfeng committed
345 346 347
    iters.filter(ir => {
      tmpOneId.equals(ir._1) || mainIDSet.contains(MobvistaConstant.String2JSONObject(ir._2).getString("id_type"))
    }).foreach(ir => {
WangJinfeng committed
348
      oneID.put(ir._1, MobvistaConstant.String2JSONObject(ir._2))
WangJinfeng committed
349 350
    })
    iters.filter(tp => {
WangJinfeng committed
351
      !mainIDSet.contains(MobvistaConstant.String2JSONObject(tp._2).getString("id_type"))
WangJinfeng committed
352 353
    }).foreach(itr => {
      val k = itr._1
WangJinfeng committed
354
      val t = itr._3
WangJinfeng committed
355
      array += (((k, t), oneID.toJSONString))
WangJinfeng committed
356 357 358 359 360 361 362 363 364 365
    })
    array
  }
}

object IDMappingGraphx {
  def main(args: Array[String]): Unit = {
    new IDMappingGraphx().run(args)
  }
}