Commit 59a91aaf by WangJinfeng

init id-mapping v1.0

parent 4fff73b8
package mobvista.dmp.datasource.id_mapping package mobvista.dmp.datasource.id_mapping
import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.MobvistaConstant._ import mobvista.dmp.common.MobvistaConstant._
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row import org.apache.spark.sql.Row
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import java.net.URLDecoder import java.net.URLDecoder
import scala.collection.mutable.ArrayBuffer
/** /**
* @package: mobvista.dmp.datasource.id_mapping * @package: mobvista.dmp.datasource.id_mapping
...@@ -243,6 +245,18 @@ object Constant { ...@@ -243,6 +245,18 @@ object Constant {
| GROUP BY imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt | GROUP BY imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt
|""".stripMargin |""".stripMargin
val ios_id_mapping_sql_v2: String =
"""
|SELECT idfa, idfv, pkg_name, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, cnt
| FROM dws.dws_device_id_ios_frequency WHERE dt = '@date' @filter_country
|""".stripMargin
val android_id_mapping_sql_v2: String =
"""
|SELECT imei, android_id, pkg_name, oaid, gaid, sysid, bkupid, xwho, user_id, country, ip, ua, brand, model, os_version, osv_upt, upt, cnt
| FROM dws.dws_device_id_android_frequency WHERE dt = '@date' @filter_country
|""".stripMargin
val old_id_mapping_sql: String = val old_id_mapping_sql: String =
""" """
| |
...@@ -282,42 +296,42 @@ object Constant { ...@@ -282,42 +296,42 @@ object Constant {
def process(idfa: String, idfv: String, pkg_name: String, imei: String, androidId: String, oaid: String, gaid: String, sysId: String, def process(idfa: String, idfv: String, pkg_name: String, imei: String, androidId: String, oaid: String, gaid: String, sysId: String,
bkupId: String, country: String, ip: String, ua: String, brand: String, model: String, os_version: String, osv_upt: String, bkupId: String, country: String, ip: String, ua: String, brand: String, model: String, os_version: String, osv_upt: String,
upt: String, network_type: String, platform: String, cnt: Long): (String, Row) = { upt: String, network_type: String, platform: String, cnt: Long): (String, Row) = {
val f_idfa = if (StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !idfa.matches(allZero)) { val f_idfa = if (StringUtils.isNotBlank(idfa) && (idfa.matches(didPtn) && !idfa.matches(allZero) || idfa.matches(md5Ptn))) {
idfa idfa
} else { } else {
"" ""
} }
val f_idfv = if (StringUtils.isNotBlank(idfv) && idfv.matches(didPtn) && !idfa.matches(allZero)) { val f_idfv = if (StringUtils.isNotBlank(idfv) && (idfv.matches(didPtn) && !idfa.matches(allZero) || idfv.matches(md5Ptn))) {
idfv idfv
} else { } else {
"" ""
} }
val f_imei = if (StringUtils.isNotBlank(imei) && imei.matches(imeiPtn) && !imei.matches(imeiPtnAll)) { val f_imei = if (StringUtils.isNotBlank(imei) && (imei.matches(imeiPtn) && !imei.matches(imeiPtnAll) || imei.matches(md5Ptn))) {
imei imei
} else { } else {
"" ""
} }
val f_androidId = if (StringUtils.isNotBlank(androidId) && androidId.matches(andriodIdPtn) && !androidId.matches(andriodIdAll)) { val f_androidId = if (StringUtils.isNotBlank(androidId) && (androidId.matches(andriodIdPtn) && !androidId.matches(andriodIdAll) || androidId.matches(md5Ptn))) {
androidId androidId
} else { } else {
"" ""
} }
val f_oaid = if (StringUtils.isNotBlank(oaid) && oaid.length >= 16 && oaid.length <= 64) { val f_oaid = if (StringUtils.isNotBlank(oaid) && (oaid.length >= 16 && oaid.length <= 64 && !oaid.matches(allZero) || oaid.matches(md5Ptn))) {
oaid oaid
} else { } else {
"" ""
} }
val f_gaid = if (StringUtils.isNotBlank(gaid) && gaid.matches(didPtn) && !gaid.matches(allZero)) { val f_gaid = if (StringUtils.isNotBlank(gaid) && (gaid.matches(didPtn) && !gaid.matches(allZero) || gaid.matches(md5Ptn))) {
gaid gaid
} else { } else {
"" ""
} }
val f_sysId = if (StringUtils.isNotBlank(sysId) && sysId.matches(didPtn) && !sysId.matches(allZero)) { val f_sysId = if (StringUtils.isNotBlank(sysId) && (sysId.matches(didPtn) && !sysId.matches(allZero) || sysId.matches(md5Ptn))) {
sysId sysId
} else { } else {
"" ""
} }
val f_bkupId = if (StringUtils.isNotBlank(bkupId) && bkupId.matches(didPtn) && !bkupId.matches(allZero)) { val f_bkupId = if (StringUtils.isNotBlank(bkupId) && (bkupId.matches(didPtn) && !bkupId.matches(allZero) || bkupId.matches(md5Ptn))) {
bkupId bkupId
} else { } else {
"" ""
...@@ -378,7 +392,7 @@ object Constant { ...@@ -378,7 +392,7 @@ object Constant {
|| f_ua.toLowerCase.contains("iphone") || f_ua.toLowerCase.contains("ipad")) { || f_ua.toLowerCase.contains("iphone") || f_ua.toLowerCase.contains("ipad")) {
"ios" "ios"
} else if (f_platform.contains("android") || f_osv.toLowerCase.contains("android") || f_ua.toLowerCase.contains("android") } else if (f_platform.contains("android") || f_osv.toLowerCase.contains("android") || f_ua.toLowerCase.contains("android")
|| f_imei.length >= 14 || (f_oaid.length >= 16 && f_oaid.length <= 64) || f_androidId.length >= 15 || f_gaid.length == 36) { || f_imei.length >= 14 || (f_oaid.length >= 16 && f_oaid.length <= 64) || f_androidId.length >= 15 || f_gaid.length == 36 || f_gaid == 32) {
"android" "android"
} else { } else {
"other" "other"
...@@ -395,5 +409,49 @@ object Constant { ...@@ -395,5 +409,49 @@ object Constant {
case class Result(device_id: String, device_type: String, one_id: String) extends Serializable case class Result(device_id: String, device_type: String, one_id: String) extends Serializable
case class OneIDScore(one_id: String, one_type: String, one_score: Double) extends Serializable case class OneIDScore(one_id: String, one_type: String, one_score: Double, one_version: String) extends Serializable
class CustomInterator(active_date: String, iter: Iterator[((String, String), Set[(String, String, Long)])],
idArray: Array[String], mainIDSet: Set[String]) extends Iterator[ArrayBuffer[((String, String), String)]] {
def hasNext: Boolean = {
iter.hasNext
}
def next: ArrayBuffer[((String, String), String)] = {
val kv = iter.next
val array = new ArrayBuffer[((String, String), String)]()
val tmpOneId = kv._1._1
val tmpOneIdType = kv._1._2
val iters = kv._2
val oneID = new JSONObject()
var minTypeIndex = idArray.indexOf(tmpOneIdType)
iters.foreach(t => {
if (idArray.indexOf(t._2) < minTypeIndex) {
minTypeIndex = idArray.indexOf(t._2)
}
if (tmpOneId.equals(t._1) || mainIDSet.contains(t._2)) {
val json = new JSONObject()
json.put("one_type", t._2)
json.put("one_date", active_date)
json.put("one_cnt", t._3)
oneID.put(t._1, json)
}
finalize()
})
array += (((tmpOneId, tmpOneIdType), oneID.toJSONString))
// if (idArray.indexOf(tmpOneIdType) > minTypeIndex) {
iters.foreach(itr => {
var oneJSON = new JSONObject()
if (oneID.containsKey(itr._1)) {
oneJSON.put(itr._1, oneID.getJSONObject(itr._1))
} else {
oneJSON = oneID
}
array += (((itr._1, itr._2), oneJSON.toJSONString))
finalize()
})
// }
array
}
}
} }
\ No newline at end of file
...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject ...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.MobvistaConstant.{sdf1, sdf2} import mobvista.dmp.common.MobvistaConstant.{sdf1, sdf2}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._ import mobvista.dmp.datasource.id_mapping.Constant._
import mobvista.dmp.util.DateUtil
import mobvista.dmp.utils.common.MD5Util.hashMD5 import mobvista.dmp.utils.common.MD5Util.hashMD5
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
...@@ -46,13 +47,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -46,13 +47,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val date = commandLine.getOptionValue("date") val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output") val output = commandLine.getOptionValue("output")
val result_output = commandLine.getOptionValue("result_output") val result_output = commandLine.getOptionValue("result_output")
val fre_output = commandLine.getOptionValue("fre_output") // val fre_output = commandLine.getOptionValue("fre_output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform") val spark = MobvistaConstant.createSparkSession(s"IDMappingGraphx.$date.$country.$platform")
try { try {
oldAndTodayIdMapping(country.toUpperCase, platform, date, spark, output, result_output, fre_output, coalesce) oldAndTodayIdMapping(country.toUpperCase, platform, date, spark, output, result_output, coalesce)
} finally { } finally {
if (spark != null) { if (spark != null) {
spark.stop() spark.stop()
...@@ -63,9 +64,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -63,9 +64,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String, def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String,
resultOutPutPath: String, frequencyOutPutPath: String, coalesce: Int) = { resultOutPutPath: String, coalesce: Int) = {
implicit val formats = org.json4s.DefaultFormats
var dailySQL = "" var dailySQL = ""
var schame: StructType = null var schame: StructType = null
...@@ -81,21 +80,22 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -81,21 +80,22 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
scoreMap = iosIDScoreMap scoreMap = iosIDScoreMap
country match { country match {
case "CN" => case "CN" =>
dailySQL = Constant.ios_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country = '${country}'") dailySQL = Constant.ios_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"AND country = '${country}'")
case _ => case _ =>
dailySQL = Constant.ios_id_mapping_sql.replace("@date", date).replace("@filter_country", s"") dailySQL = Constant.ios_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"")
} }
case "android" => { case "android" => {
schame = adrVertSchema
idMainSet = androidMainIDSet idMainSet = androidMainIDSet
scoreMap = androidIDScoreMap scoreMap = androidIDScoreMap
country match { country match {
case "CN" => case "CN" =>
schame = adrCNVertSchema
idSet = androidCNIDSet idSet = androidCNIDSet
dailySQL = Constant.android_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country = '${country}'") dailySQL = Constant.android_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"AND country = '${country}'")
case _ => case _ =>
schame = adrVertSchema
idSet = androidIDSet idSet = androidIDSet
dailySQL = Constant.android_id_mapping_sql.replace("@date", date).replace("@filter_country", s"AND country != '${country}'") dailySQL = Constant.android_id_mapping_sql_v2.replace("@date", date).replace("@filter_country", s"AND country != '${country}'")
} }
} }
case _ => case _ =>
...@@ -103,6 +103,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -103,6 +103,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
} }
val df = spark.sql(dailySQL) val df = spark.sql(dailySQL)
/*
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
df.persist(StorageLevel.MEMORY_AND_DISK_SER) df.persist(StorageLevel.MEMORY_AND_DISK_SER)
...@@ -123,6 +124,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -123,6 +124,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
|ALTER TABLE dws.$fre_table ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}') |ALTER TABLE dws.$fre_table ADD IF NOT EXISTS PARTITION (dt='$date',source='${country.toLowerCase}')
| LOCATION '$frequencyOutPutPath' | LOCATION '$frequencyOutPutPath'
|""".stripMargin) |""".stripMargin)
*/
val todayDF = spark.createDataFrame(df.rdd.map(row => { val todayDF = spark.createDataFrame(df.rdd.map(row => {
processData(row, platform) processData(row, platform)
...@@ -134,66 +136,73 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -134,66 +136,73 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
}).flatMap(l => l) }).flatMap(l => l)
val maxGraph = vertex.combineByKey( val maxGraph = vertex.combineByKey(
(v: (String, String, String)) => Set(v), (v: (String, String, Long)) => Set(v),
(c: Set[(String, String, String)], v: (String, String, String)) => c ++ Seq(v), (c: Set[(String, String, Long)], v: (String, String, Long)) => c ++ Seq(v),
(c1: Set[(String, String, String)], c2: Set[(String, String, String)]) => c1 ++ c2 (c1: Set[(String, String, Long)], c2: Set[(String, String, Long)]) => c1 ++ c2
) )
maxGraph.persist(StorageLevel.MEMORY_AND_DISK_SER)
// 非主ID生成OneID // 非主ID生成OneID
val multiOneIDRDD = maxGraph.filter(kv => { val multiOneIDRDD = maxGraph.filter(kv => {
kv._2.size > 1 kv._2.size > 1
}).map(rs => { }).mapPartitions(rs => new CustomInterator(schedule_date, rs, idSet, idMainSet))
platform match { .flatMap(l => l)
case "ios" => // .map(rs => updateOneID(schedule_date, rs, idSet, idMainSet)).flatMap(l => l)
updateOneID(rs, Constant.iosMainIDSet)
case _ =>
updateOneID(rs, Constant.androidMainIDSet)
}
}).flatMap(l => l)
// 主ID生成OneID // 主ID生成OneID
val singleOneIDRDD = maxGraph.filter(kv => { val singleOneIDRDD = maxGraph.filter(kv => {
kv._2.size == 1 kv._2.size == 1
}).map(kv => { }).map(kv => {
val oneID = new JSONObject() val oneID = new JSONObject()
val srcID = kv._1 kv._2.foreach(t => {
var idType = "" val json = new JSONObject()
kv._2.foreach(it => { json.put("one_type", t._2)
idType = it._3 json.put("one_date", schedule_date)
oneID.put(it._1, MobvistaConstant.String2JSONObject(it._2)) json.put("one_cnt", t._3)
oneID.put(t._1, json)
})
(kv._1, oneID.toJSONString)
}) })
((srcID, idType), oneID.toJSONString)
val yesDate = DateUtil.getDayByString(date, "yyyyMMdd", -1)
val oldMidMergeOneIDRDD = spark.sql(
s"""
|SELECT device_id, device_type, one_id
| FROM ads.ads_device_id_mapping WHERE dt = '$yesDate' AND source = '${country.toLowerCase}' AND platform = '$platform' AND `type` = 'mid'
|""".stripMargin)
.rdd.map(row => {
((row.getAs[String]("device_id"), row.getAs[String]("device_type")), row.getAs[String]("one_id"))
}) })
val midMergeOneIDRDD = multiOneIDRDD.union(singleOneIDRDD).combineByKey( val midMergeOneIDRDD = spark.sparkContext.union(Seq(singleOneIDRDD, multiOneIDRDD, oldMidMergeOneIDRDD))
.combineByKey(
(v: String) => Set(v), (v: String) => Set(v),
(c: Set[String], v: String) => c ++ Seq(v), (c: Set[String], v: String) => c ++ Seq(v),
(c1: Set[String], c2: Set[String]) => c1 ++ c2 (c1: Set[String], c2: Set[String]) => c1 ++ c2
).map(kv => { ).map(kv => {
val srcId = if (kv._1._1.matches(MobvistaConstant.md5Ptn)) { val srcId = kv._1._1
kv._1._1
} else {
hashMD5(kv._1._1)
}
val srcType = kv._1._2 val srcType = kv._1._2
val oneIDJSON = new JSONObject() val oneIDJSON = new JSONObject()
kv._2.foreach(js => { kv._2.foreach(js => {
val json = MobvistaConstant.String2JSONObject(js) val json = MobvistaConstant.String2JSONObject(js)
val keys = json.keySet().asScala val keys = json.keySet().asScala
keys.foreach(key => { keys.foreach(key => {
val oneID = if (key.matches(MobvistaConstant.md5Ptn)) { if (oneIDJSON.containsKey(key) && oneIDJSON.getJSONObject(key).getString("one_date")
key .compareTo(json.getJSONObject(key).getString("one_date")) < 0
} else { || !oneIDJSON.containsKey(key)) {
hashMD5(key) oneIDJSON.put(key, json.getJSONObject(key))
} }
oneIDJSON.put(oneID, json.getJSONObject(key))
}) })
}) })
Result(srcId, srcType, oneIDJSON.toJSONString) Result(srcId, srcType, oneIDJSON.toJSONString)
}).persist(StorageLevel.MEMORY_AND_DISK_SER) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
import spark.implicits._
midMergeOneIDRDD.toDF midMergeOneIDRDD.toDF
.repartition(coalesce) .repartition(coalesce)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
...@@ -212,26 +221,26 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -212,26 +221,26 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
val device_type = r.device_type val device_type = r.device_type
val one_id = MobvistaConstant.String2JSONObject(r.one_id) val one_id = MobvistaConstant.String2JSONObject(r.one_id)
val keys = one_id.keySet().asScala val keys = one_id.keySet().asScala
var oneIDScore: OneIDScore = OneIDScore("", "", 0) var oneIDScore: OneIDScore = OneIDScore("", "", 0, "")
keys.foreach(key => { keys.foreach(key => {
val sdf = new SimpleDateFormat("yyyy-MM-dd") val sdf = new SimpleDateFormat("yyyy-MM-dd")
val json = one_id.getJSONObject(key) val json = one_id.getJSONObject(key)
val id_type = json.getString("id_type") val id_type = json.getString("one_type")
val id_type_score = scoreMap(id_type) val id_type_score = scoreMap(id_type)
val active_date = json.getString("active_date") val active_date = json.getString("one_date")
val cnt = json.getLongValue("cnt") val cnt = json.getLongValue("one_cnt")
val days = (sdf.parse(schedule_date).getTime - sdf.parse(active_date).getTime) / 1000 / 3600 / 24 + 1 val days = (sdf.parse(schedule_date).getTime - sdf.parse(active_date).getTime) / 1000 / 3600 / 24 + 1
val score = id_type_score * 30 / days + 0.1 * cnt val score = id_type_score * 30 / days + 0.1 * cnt
if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1 if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1
|| (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) { || (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) {
oneIDScore = OneIDScore(key, id_type, score) oneIDScore = OneIDScore(key, id_type, score, active_date)
} }
}) })
val json = new JSONObject() val json = new JSONObject()
json.put("one_id", oneIDScore.one_id) json.put("one_id", oneIDScore.one_id)
json.put("one_type", oneIDScore.one_type) json.put("one_type", oneIDScore.one_type)
// json.put("one_score", oneIDScore.one_score) json.put("one_score", oneIDScore.one_score)
json.put("one_version", oneIDScore.one_version)
Result(device_id, device_type, json.toJSONString) Result(device_id, device_type, json.toJSONString)
}) })
}) })
...@@ -258,23 +267,33 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -258,23 +267,33 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
platform match { platform match {
case "ios" => case "ios" =>
var idfa = row.getAs[String]("idfa") var idfa = row.getAs[String]("idfa")
idfa = if (StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn) && !idfa.matches(MobvistaConstant.allZero)) { idfa = if (StringUtils.isNotBlank(idfa) && (idfa.matches(didPtn) && !idfa.matches(allZero) || idfa.matches(md5Ptn))) {
idfa idfa
} else { } else {
"" ""
} }
var idfv = row.getAs[String]("idfv") var idfv = row.getAs[String]("idfv")
idfv = if (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn) && !idfv.matches(MobvistaConstant.allZero)) { idfv = if (StringUtils.isNotBlank(idfv) && (idfv.matches(didPtn) && !idfv.matches(allZero) || idfv.matches(md5Ptn))) {
idfv idfv
} else { } else {
"" ""
} }
val pkg_name = row.getAs[String]("pkg_name") val pkg_name = row.getAs[String]("pkg_name")
val sysid = row.getAs[String]("sysid") var sysid = row.getAs[String]("sysid")
val bkupid = row.getAs[String]("bkupid") sysid = if (StringUtils.isNotBlank(sysid) && (sysid.matches(didPtn) && !sysid.matches(allZero) || sysid.matches(md5Ptn))) {
sysid
} else {
""
}
var bkupid = row.getAs[String]("bkupid")
bkupid = if (StringUtils.isNotBlank(bkupid) && (bkupid.matches(didPtn) && !bkupid.matches(allZero) || bkupid.matches(md5Ptn))) {
bkupid
} else {
""
}
val xwho = row.getAs[String]("xwho") val xwho = row.getAs[String]("xwho")
val user_id = row.getAs[String]("user_id") val user_id = row.getAs[String]("user_id")
val country = row.getAs[String]("country") // val country = row.getAs[String]("country")
val ip = row.getAs[String]("ip") val ip = row.getAs[String]("ip")
val ua = row.getAs[String]("ua") val ua = row.getAs[String]("ua")
val brand = row.getAs[String]("brand") val brand = row.getAs[String]("brand")
...@@ -306,13 +325,44 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -306,13 +325,44 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
// IosVert(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt) // IosVert(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
Row(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt) Row(idfa, sysid, idfv_bundle, bmosv_osv_upt, bmosv_upt, bmosv_ipua_bundle, xwho, user_id, bkupid, cnt)
case "android" => case "android" =>
val imei = row.getAs[String]("imei") var imei = row.getAs[String]("imei")
val android_id = row.getAs[String]("android_id") imei = if (StringUtils.isNotBlank(imei) && (imei.matches(imeiPtn) && !imei.matches(imeiPtnAll) || imei.matches(md5Ptn))) {
imei
} else {
""
}
var android_id = row.getAs[String]("android_id")
android_id = if (StringUtils.isNotBlank(android_id) && (android_id.matches(andriodIdPtn) && !android_id.matches(andriodIdAll)
|| android_id.matches(md5Ptn))) {
android_id
} else {
""
}
val pkg_name = row.getAs[String]("pkg_name") val pkg_name = row.getAs[String]("pkg_name")
val oaid = row.getAs[String]("oaid") var oaid = row.getAs[String]("oaid")
val gaid = row.getAs[String]("gaid") oaid = if (StringUtils.isNotBlank(oaid) && (oaid.length >= 16 && oaid.length <= 64 && !oaid.matches(allZero) || oaid.matches(md5Ptn))) {
val sysid = row.getAs[String]("sysid") oaid
val bkupid = row.getAs[String]("bkupid") } else {
""
}
var gaid = row.getAs[String]("gaid")
gaid = if (StringUtils.isNotBlank(gaid) && (gaid.matches(didPtn) && !gaid.matches(allZero) || gaid.matches(md5Ptn))) {
gaid
} else {
""
}
var sysid = row.getAs[String]("sysid")
sysid = if (StringUtils.isNotBlank(sysid) && (sysid.matches(didPtn) && !sysid.matches(allZero) || sysid.matches(md5Ptn))) {
sysid
} else {
""
}
var bkupid = row.getAs[String]("bkupid")
bkupid = if (StringUtils.isNotBlank(bkupid) && (bkupid.matches(didPtn) && !bkupid.matches(allZero) || bkupid.matches(md5Ptn))) {
bkupid
} else {
""
}
val xwho = row.getAs[String]("xwho") val xwho = row.getAs[String]("xwho")
val user_id = row.getAs[String]("user_id") val user_id = row.getAs[String]("user_id")
val country = row.getAs[String]("country") val country = row.getAs[String]("country")
...@@ -349,31 +399,44 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -349,31 +399,44 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
} }
} }
def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[(String, (String, String, String))] = { /**
val array = new ArrayBuffer[(String, (String, String, String))]() *
implicit val formats = org.json4s.DefaultFormats * @param date
* @param row
* @param ids
* @param mainIDSet
* @return
* (srcID,oneID,oneIDJSON,srcType)
* (oneID,srcID,oneIDJSON,srcType)
*/
def processVertex(date: String, row: Row, ids: Array[String], mainIDSet: Set[String]): ArrayBuffer[((String, String), (String, String, Long))] = {
val array = new ArrayBuffer[((String, String), (String, String, Long))]()
// val json = JSON.parseObject(Serialization.write(row)) // val json = JSON.parseObject(Serialization.write(row))
// 事件频次 // 事件频次
val cnt = row.getAs[Long]("cnt") val cnt = row.getAs[Long]("cnt")
// date 活跃日期,用于计算权重 // date 活跃日期,用于计算权重
var flag = true var flag = true
for (i <- 0 to ids.length - 2) { for (i <- ids.indices) {
if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(i)))) && flag) { if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(i)))) && flag) {
val jsonObject = new JSONObject()
val oneIDType = ids(i) val oneIDType = ids(i)
jsonObject.put("id_type", oneIDType) val oneID = if (row.getAs[String](String.valueOf(ids(i))).matches(md5Ptn)) {
jsonObject.put("active_date", date) row.getAs[String](String.valueOf(ids(i)))
jsonObject.put("cnt", cnt) } else {
val oneID = row.getAs[String](String.valueOf(ids(i))) hashMD5(row.getAs[String](String.valueOf(ids(i))))
array += ((oneID, (oneID, jsonObject.toJSONString, oneIDType))) }
array += (((oneID, oneIDType), (oneID, oneIDType, cnt)))
for (j <- i + 1 until ids.length) { for (j <- i + 1 until ids.length) {
if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) { if (StringUtils.isNotBlank(row.getAs[String](String.valueOf(ids(j))))) {
val srcType = ids(j) val srcType = ids(j)
val srcOrg = row.getAs[String](String.valueOf(srcType)) val srcOrg = if (row.getAs[String](srcType).matches(md5Ptn)) {
row.getAs[String](srcType)
} else {
hashMD5(row.getAs[String](srcType))
}
if (mainIDSet.contains(oneIDType)) { if (mainIDSet.contains(oneIDType)) {
array += ((srcOrg, (oneID, jsonObject.toJSONString, srcType))) array += (((srcOrg, srcType), (oneID, oneIDType, cnt)))
} else { } else {
array += ((oneID, (srcOrg, jsonObject.toJSONString, srcType))) array += (((oneID, oneIDType), (srcOrg, srcType, cnt)))
} }
} }
} }
...@@ -383,21 +446,44 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -383,21 +446,44 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
array array
} }
def updateOneID(kv: (String, Set[(String, String, String)]), mainIDSet: Set[String]): ArrayBuffer[((String, String), String)] = { /**
*
* @param kv
* @param mainIDSet
* @return
* ((srcID,srcType),oneID)
*/
def updateOneID(active_date: String, kv: ((String, String), Set[(String, String, Long)]), idArray: Array[String], mainIDSet: Set[String]): ArrayBuffer[((String, String), String)] = {
val array = new ArrayBuffer[((String, String), String)]() val array = new ArrayBuffer[((String, String), String)]()
val tmpOneId = kv._1 val tmpOneId = kv._1._1
val tmpOneIdType = kv._1._2
val iters = kv._2 val iters = kv._2
val oneID = new JSONObject() val oneID = new JSONObject()
iters.filter(ir => { var minTypeIndex = idArray.indexOf(tmpOneIdType)
tmpOneId.equals(ir._1) || mainIDSet.contains(MobvistaConstant.String2JSONObject(ir._2).getString("id_type")) iters.foreach(t => {
}).foreach(ir => { if (idArray.indexOf(t._2) < minTypeIndex) {
oneID.put(ir._1, MobvistaConstant.String2JSONObject(ir._2)) minTypeIndex = idArray.indexOf(t._2)
}
if (tmpOneId.equals(t._1) || mainIDSet.contains(t._2)) {
val json = new JSONObject()
json.put("one_type", t._2)
json.put("one_date", active_date)
json.put("one_cnt", t._3)
oneID.put(t._1, json)
}
}) })
iters.foreach(itr => { array += (((tmpOneId, tmpOneIdType), oneID.toJSONString))
val k = itr._1 if (idArray.indexOf(tmpOneIdType) > minTypeIndex) {
val t = itr._3 iters.map(itr => {
array += (((k, t), oneID.toJSONString)) var oneJSON = new JSONObject()
if (oneID.containsKey(itr._1)) {
oneJSON.put(itr._1, oneID.getJSONObject(itr._1))
} else {
oneJSON = oneID
}
array += (((itr._1, itr._2), oneJSON.toJSONString))
}) })
}
array array
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment