Commit a7c11c2b by WangJinfeng

init id_mapping

parent 228e128a
package mobvista.dmp.datasource.id_mapping package mobvista.dmp.datasource.id_mapping
import com.alibaba.fastjson.JSONObject import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.MobvistaConstant.sdf1
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.id_mapping.Constant._ import mobvista.dmp.datasource.id_mapping.Constant._
import mobvista.dmp.util.MD5Util import mobvista.dmp.util.MD5Util
import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI import java.net.URI
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
...@@ -57,7 +58,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -57,7 +58,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String, def oldAndTodayIdMapping(country: String, platform: String, date: String, spark: SparkSession, outPutPath: String,
edgeoutPutPath: String, coalesce: Int) = { resultOutPutPath: String, coalesce: Int) = {
implicit val formats = org.json4s.DefaultFormats implicit val formats = org.json4s.DefaultFormats
...@@ -65,6 +66,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -65,6 +66,7 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
var schame: StructType = null var schame: StructType = null
var idSet: Array[String] = null var idSet: Array[String] = null
var idMainSet: Set[String] = null var idMainSet: Set[String] = null
var scoreMap: Map[String, Double] = null
// 1.今日数据加载 // 1.今日数据加载
platform match { platform match {
case "ios" => case "ios" =>
...@@ -72,9 +74,11 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -72,9 +74,11 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
schame = iosVertSchema schame = iosVertSchema
idSet = iosIDSet idSet = iosIDSet
idMainSet = iosMainIDSet idMainSet = iosMainIDSet
scoreMap = iosIDScoreMap
case "android" => { case "android" => {
schame = adrVertSchema schame = adrVertSchema
idMainSet = androidMainIDSet idMainSet = androidMainIDSet
scoreMap = androidIDScoreMap
country match { country match {
case "CN" => case "CN" =>
idSet = androidCNIDSet idSet = androidCNIDSet
...@@ -127,29 +131,76 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -127,29 +131,76 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
((srcID, idType), oneID.toJSONString) ((srcID, idType), oneID.toJSONString)
}) })
val mergeOneIDRDD = multiOneIDRDD.union(singleOneIDRDD).combineByKey( val midMergeOneIDRDD = multiOneIDRDD.union(singleOneIDRDD).combineByKey(
(v: String) => Set(v), (v: String) => Set(v),
(c: Set[String], v: String) => c ++ Seq(v), (c: Set[String], v: String) => c ++ Seq(v),
(c1: Set[String], c2: Set[String]) => c1 ++ c2 (c1: Set[String], c2: Set[String]) => c1 ++ c2
).map(kv => { ).map(kv => {
val srcId = kv._1._1 val srcId = if (kv._1._1.matches(MobvistaConstant.md5Ptn)) {
kv._1._1
} else {
MD5Util.getMD5Str(kv._1._1)
}
val srcType = kv._1._2 val srcType = kv._1._2
val oneID = new JSONObject() val oneIDJSON = new JSONObject()
kv._2.foreach(js => { kv._2.foreach(js => {
val json = MobvistaConstant.String2JSONObject(js) val json = MobvistaConstant.String2JSONObject(js)
val keys = json.keySet().asScala val keys = json.keySet().asScala
keys.foreach(key => { keys.foreach(key => {
oneID.put(key, json.getJSONObject(key)) val oneID = if (key.matches(MobvistaConstant.md5Ptn)) {
key
} else {
MD5Util.getMD5Str(key)
}
oneIDJSON.put(oneID, json.getJSONObject(key))
}) })
}) })
(srcId, srcType, oneID) Result(srcId, srcType, oneIDJSON.toJSONString)
}) }).persist(StorageLevel.MEMORY_AND_DISK_SER)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outPutPath), true)
mergeOneIDRDD import spark.implicits._
midMergeOneIDRDD.toDF
.repartition(coalesce)
.write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(outPutPath)
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(resultOutPutPath), true)
val resultOneID = midMergeOneIDRDD.mapPartitions(rs => {
rs.map(r => {
val device_id = r.device_id
val device_type = r.device_type
val one_id = MobvistaConstant.String2JSONObject(r.one_id)
val keys = one_id.keySet().asScala
var oneIDScore: OneIDScore = OneIDScore("", "", 0)
keys.foreach(key => {
val id_type = one_id.getString("id_type")
val id_type_score = scoreMap(id_type)
val active_date = one_id.getString("active_date")
val cnt = one_id.getIntValue("cnt")
val days = (sdf1.parse(date).getTime - sdf1.parse(active_date).getTime) / 1000 / 3600 / 24 + 1
val score = id_type_score * 30 / days + 0.1 * cnt
if (idSet.indexOf(id_type) < idSet.indexOf(oneIDScore.one_type) || idSet.indexOf(oneIDScore.one_type) == -1
|| (idSet.indexOf(id_type) == idSet.indexOf(oneIDScore.one_type) && score >= oneIDScore.one_score)) {
oneIDScore = OneIDScore(key, id_type, score)
}
})
val json = new JSONObject()
json.put("one_id", oneIDScore.one_id)
json.put("one_type", oneIDScore.one_type)
json.put("one_score", oneIDScore.one_score)
Result(device_id = device_id, device_type = device_type, one_id = json.toJSONString)
})
})
resultOneID
.toDF
.repartition(coalesce) .repartition(coalesce)
.saveAsTextFile(outPutPath, classOf[GzipCodec]) .write.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(resultOutPutPath)
} }
...@@ -157,13 +208,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable { ...@@ -157,13 +208,13 @@ class IDMappingGraphx extends CommonSparkJob with Serializable {
platform match { platform match {
case "ios" => case "ios" =>
var idfa = row.getAs[String]("idfa") var idfa = row.getAs[String]("idfa")
idfa = if (StringUtils.isNotBlank(idfa) && idfa.matches(didPtn) && !idfa.matches(allZero)) { idfa = if (StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn) && !idfa.matches(MobvistaConstant.allZero)) {
idfa idfa
} else { } else {
"" ""
} }
var idfv = row.getAs[String]("idfv") var idfv = row.getAs[String]("idfv")
idfv = if (StringUtils.isNotBlank(idfv) && idfv.matches(didPtn) && !idfv.matches(allZero)) { idfv = if (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn) && !idfv.matches(MobvistaConstant.allZero)) {
idfv idfv
} else { } else {
"" ""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment