package mobvista.dmp.datasource.dsp import java.util.regex.Pattern import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang.StringUtils import org.apache.spark.sql.Row /** * @package: mobvista.dmp.datasource.dsp * @author: wangjf * @date: 2019-08-15 * @time: 15:33 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class CustomerIterator(iter: Iterator[Row]) extends Iterator[DspReqVO] { private val IDFA = 37 private val GAID = 34 private val PKG_NAME = 20 private val PLATFORM = 29 private val UPDATE_TIME = 0 private val IP = 26 private val MAKER = 27 private val MODEL = 28 private val OS_VERSION = 30 private val COUNTRY_CODE = 33 private val BIRTHDAY = 39 private val GENDER = 40 private val EXT_ID = 15 private val JSON_MSG = 6 private val DATA_SPLIT = "\t" private val iosPkgPtn = Pattern.compile("^\\d+$") private val adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$") def hasNext: Boolean = { iter.hasNext } def next: DspReqVO = { val row = iter.next val idfa = row.getString(IDFA) val gaid = row.getString(GAID) var packageName = row.getString(PKG_NAME) val platform = row.getString(PLATFORM) val time = row.getString(UPDATE_TIME) val ip = row.getString(IP) val maker = row.getString(MAKER) val model = row.getString(MODEL) val osVersion = row.getString(OS_VERSION) val country = row.getString(COUNTRY_CODE) val birthday = row.getString(BIRTHDAY) val gender = row.getString(GENDER) val exitId = row.getString(EXT_ID) val jsonMsg = row.getString(JSON_MSG) if ("ios".equalsIgnoreCase(platform) && packageName.matches("^id\\\\d+$")) { packageName = packageName.replaceAll("id", "") } var deviceId = "" var deviceType = "" if ("ios".equals(platform)) { deviceId = idfa deviceType = "idfa" } else if ("android".equals(platform)) { deviceId = gaid deviceType = "gaid" } val androidId = splitFun(exitId, ",")(5) var geoInfo = "" var longitude = "" var latitude = "" var segment = "" //处理jsonMsg,获取geo属性值 if (jsonMsg.startsWith("{")) { val json = GsonUtil.String2JsonObject(jsonMsg) val element = json.get("device") if (element != null && !element.isJsonNull) { val geoElement = element.getAsJsonObject.get("geo") if (geoElement != null && !geoElement.isJsonNull) { geoInfo = geoElement.toString val geoJson = geoElement.getAsJsonObject val lonElement = geoJson.get("lon") if (lonElement != null && !lonElement.isJsonNull) { longitude = lonElement.toString } val latElement = geoJson.get("lat") if (latElement != null && !latElement.isJsonNull) { latitude = latElement.toString } } } import scala.collection.JavaConversions._ // 获取segment信息 val userElement = json.get("user") if (userElement != null && !userElement.isJsonNull) { val dataElement = userElement.getAsJsonObject.get("data") if (dataElement != null && !dataElement.isJsonNull) dataElement.getAsJsonArray .foreach(dataEle => { val segElement = dataEle.getAsJsonObject.get("segment") if (segElement != null && !segElement.isJsonNull) { segment = segElement.toString } }) } } DspReqVO(deviceId, deviceType, platform, country, ip, gender, birthday, maker, model, osVersion, "", androidId, time, geoInfo, longitude, latitude, null) } def splitFun(line: String, split: String): Array[String] = { if (split == null) { StringUtils.splitPreserveAllTokens(line, DATA_SPLIT, -1) } else { StringUtils.splitPreserveAllTokens(line, split, -1) } } }