package mobvista.dmp.datasource.ga import java.security.{MessageDigest, NoSuchAlgorithmException} import java.text.SimpleDateFormat import java.util.regex.Pattern import com.fasterxml.jackson.databind.ObjectMapper import org.apache.commons.lang.StringUtils import org.apache.spark.sql.types.{StringType, StructField, StructType} /** * @package: mobvista.dmp.datasource * @author: wangjf * @date: 2018/11/3 * @time: 下午2:44 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ object Constant { def schema: StructType = { StructType(StructField("text", StringType) :: Nil) } val event_sql: String = s""" |INSERT OVERWRITE TABLE dev.event_@date_str_undline | SELECT json_col(`text`,'data','v') AS version, | json_col(`text`,'game_id','') AS game_id, | json_col(`text`,'data','user_id') AS user_id, | json_col(`text`,'arrival_ts','') AS arrival_ts, | json_col(`text`,'user_meta','install_ts') AS install_ts, | COALESCE(json_col(`text`,'category',''),json_col(`text`,'data','category')) AS category, | json_col(`text`,'data','session_id') AS session_id, | json_col(`text`,'data','event_id') AS event_id, | json_col(`text`,'country_code','') AS country_code, | json_col(`text`,'ip','') AS ip, | json_col(`text`,'user_meta','revenue') AS revenue, | json_col(`text`,'data','facebook_id') AS facebook_id, | json_col(`text`,'data','googleplus_id') AS googleplus_id, | json_col(`text`,'data','ios_id') AS ios_id, | json_col(`text`,'data','android_id') AS android_id, | json_col(`text`,'data','google_aid') AS google_aid, | json_col(`text`,'data','ios_idfa') AS ios_idfa, | json_col(`text`,'data','ios_idfv') AS ios_idfv, | json_col(`text`,'data','limit_ad_tracking') AS is_ad_tracking_limited, | json_col(`text`,'data','logon_gamecenter') AS is_logged_gamecenter, | json_col(`text`,'data','logon_googleplay') AS is_logged_googleplay, | json_col(`text`,'data','value') AS value, | json_col(`text`,'data','attempt_num') AS attempt_num, | case when json_col(`text`,'data','score') >= 2147483647 then 0 else json_col(`text`,'data','score') end as score, | json_col(`text`,'data','session_num') AS session_num, | json_col(`text`,'data','length') AS session_length, | json_col(`text`,'data','gender') AS gender, | json_col(`text`,'data','birth_year') AS birth_year, | json_col(`text`,'data','progression') AS progression, | json_col(`text`,'data','sdk_version') AS sdk_version, | json_col(`text`,'data','engine_version') AS engine_version, | json_col(`text`,'data','os_version') AS os_version, | json_col(`text`,'data','manufacturer') AS manufacturer, | json_col(`text`,'data','device') AS device, | json_col(`text`,'data','platform') AS platform, | json_col(`text`,'data','build') AS build, | json_col(`text`,'data','connection_type') AS connection_type, | json_col(`text`,'data','jailbroken') AS is_jailbroken, | json_col(`text`,'user_meta','pay_ft') AS pay_ft, | json_col(`text`,'data','currency') AS currency, | json_col(`text`,'data','amount') AS amount, | json_col(`text`,'data','cart_type') AS cart_type, | json_col(`text`,'data','transaction_num') AS transaction_num, | json_col(`text`,'user_meta','receipt_status') AS receipt_status, | json_col(`text`,'user_meta','os_major') AS v1_os_major, | json_col(`text`,'user_meta','device') AS v1_device, | json_col(`text`,'user_meta','platform') AS v1_platform, | json_col(`text`,'user_meta','gender') AS v1_gender, | json_col(`text`,'user_meta','sdk_version') AS v1_sdk_version, | json_col(`text`,'user_meta','google_aid') as v1_google_aid, | json_col(`text`,'user_meta','android_id') as v1_android_id, | json_col(`text`,'data','category') AS v2_category, | json_col(`text`,'user_meta','install_publisher') AS install_publisher, | json_col(`text`,'user_meta','install_site') AS install_site, | json_col(`text`,'user_meta','install_campaign') AS install_campaign, | json_col(`text`,'user_meta','install_adgroup') AS install_adgroup, | json_col(`text`,'user_meta','install_ad') AS install_ad, | json_col(`text`,'user_meta','install_keyword') AS install_keyword, | json_col(`text`,'data','severity') AS severity |FROM event_text |WHERE to_date(json_col(`text`,'arrival_ts','')) LIKE '@date_str_midline' AND json_col(`text`,'data','user_id') != '00000000-0000-0000-0000-000000000000' AND json_col(`text`,'data','user_id') != '0' | AND json_col(`text`,'data','user_id') IS NOT NULL AND json_col(`text`,'data','user_id') != 'nullUser' | AND json_col(`text`,'data','user_id') != '' """.stripMargin def parserGa(iters: Iterator[String], date: String, filterGameIds: Set[Int]): Iterator[GA] = { iters.map(iter => { try { val mapper = new ObjectMapper() val jsonNode = mapper.readTree(iter) val dataNode = jsonNode.path("data") val countryCodeNodeTmp = jsonNode.path("country_code") var countryCodeNode = countryCodeNodeTmp.asText("") if (StringUtils.isNotBlank(countryCodeNodeTmp.asText()) && countryCodeNodeTmp.asText().equalsIgnoreCase("GB")) { //当为GB时要修改成UK countryCodeNode = "UK" } val arrivalTsNode = jsonNode.path("arrival_ts") val gameIdNode = jsonNode.path("game_id") val ipNode = jsonNode.path("ip") val categoryNode = jsonNode.path("category") val userMetaNode = jsonNode.path("user_meta") val category = if (StringUtils.isNotBlank(categoryNode.asText())) { categoryNode.asText() } else { dataNode.path(Data.CATEGORY.toString).asText() } val score: Integer = if (!dataNode.has(Data.SCORE.toString)) { null } else if (dataNode.path(Data.SCORE.toString).asInt() > 2147483647) { 0 } else { dataNode.path(Data.SCORE.toString).asInt() } val game_id = if (StringUtils.isNotBlank(dataNode.path(Data.GAME_ID.toString).asText())) { dataNode.path(Data.GAME_ID.toString).asInt() } else { gameIdNode.asInt() } val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val arrival_ts = format.format(new java.util.Date(arrivalTsNode.asLong(0) * 1000)) val install_ts = format.format(userMetaNode.path(UserMeta.INSTALL_TS.toString).asLong(0) * 1000) val revenue = userMetaNode.path(UserMeta.REVENUE.toString).toString val user_id = dataNode.path(Data.USER_ID.toString).asText() if (arrival_ts.startsWith(date) && check_userId(user_id) && !filterGameIds.contains(game_id)) { val user_id = generateJsonVal(dataNode, Data.USER_ID.toString) GA(dataNode.path(Data.VERSION.toString).asInt(), game_id, user_id, md5Str(game_id + user_id), arrival_ts, install_ts, category.trim, generateJsonVal(dataNode, Data.SESSION_ID.toString), generateJsonVal(dataNode, Data.EVENT_ID.toString), countryCodeNode.trim, ipNode.asText().trim, revenue.trim, generateJsonVal(dataNode, Data.FACEBOOK_ID.toString), generateJsonVal(dataNode, Data.GOOGLEPLUS_ID.toString), generateJsonVal(dataNode, Data.IOS_ID.toString), generateJsonVal(dataNode, Data.ANDROID_ID.toString), generateJsonVal(dataNode, Data.GOOGLE_AID.toString), generateJsonVal(dataNode, Data.IOS_IDFA.toString), generateJsonVal(dataNode, Data.IOS_IDFV.toString), dataNode.path(Data.LIMIT_AD_TRACKING.toString).asBoolean(), dataNode.path(Data.LOGON_GAMECENTER.toString).asBoolean(), dataNode.path(Data.LOGON_GOOGLEPLAY.toString).asBoolean(), dataNode.path(Data.VALUE.toString).asDouble(), dataNode.path(Data.ATTEMPT_NUM.toString).asInt(), score, dataNode.path(Data.SESSION_NUM.toString).asInt(), dataNode.path(Data.LENGTH.toString).asInt(), generateJsonVal(dataNode, Data.GENDER.toString), generateJsonVal(dataNode, Data.BIRTH_YEAR.toString), generateJsonVal(dataNode, Data.PROGRESSION.toString), generateJsonVal(dataNode, Data.SDK_VERSION.toString), generateJsonVal(dataNode, Data.ENGINE_VERSION.toString), generateJsonVal(dataNode, Data.OS_VERSION.toString), generateJsonVal(dataNode, Data.MANUFACTURER.toString), generateJsonVal(dataNode, Data.DEVICE.toString), generateJsonVal(dataNode, Data.PLATFORM.toString), generateJsonVal(dataNode, Data.BUILD.toString), generateJsonVal(dataNode, Data.CONNECTION_TYPE.toString), dataNode.path(Data.JAILBROKEN.toString).asBoolean(), generateJsonVal(userMetaNode, UserMeta.PAY_FT.toString), generateJsonVal(dataNode, Data.CURRENCY.toString), dataNode.path(Data.AMOUNT.toString).asDouble(), generateJsonVal(dataNode, Data.CART_TYPE.toString), dataNode.path(Data.TRANSACTION_NUM.toString).asInt(), generateJsonVal(userMetaNode, UserMeta.RECEIPT_STATUS.toString), generateJsonVal(userMetaNode, UserMeta.OS_MAJOR.toString), generateJsonVal(userMetaNode, UserMeta.DEVICE.toString), generateJsonVal(userMetaNode, UserMeta.PLATFORM.toString), generateJsonVal(userMetaNode, UserMeta.GENDER.toString), generateJsonVal(userMetaNode, UserMeta.SDK_VERSION.toString), generateJsonVal(userMetaNode, UserMeta.GOOGLE_AID.toString), generateJsonVal(userMetaNode, UserMeta.ANDROID_ID.toString), generateJsonVal(dataNode, Data.CATEGORY.toString), generateJsonVal(userMetaNode, UserMeta.INSTALL_PUBLISHER.toString), generateJsonVal(userMetaNode, UserMeta.INSTALL_SITE.toString), generateJsonVal(userMetaNode, UserMeta.INSTALL_CAMPAIGN.toString), generateJsonVal(userMetaNode, UserMeta.INSTALL_ADGROUP.toString), generateJsonVal(userMetaNode, UserMeta.INSTALL_AD.toString), generateJsonVal(userMetaNode, UserMeta.INSTALL_KEYWORD.toString), generateJsonVal(dataNode, Data.SEVERITY.toString) ) } else { null } } catch { case _: Exception => null } }) } val allZero = "00000000-0000-0000-0000-000000000000" private val pattern = Pattern.compile("^[a-zA-Z0-9_-]+$") def generateJsonVal(dataNode: com.fasterxml.jackson.databind.JsonNode, key: String): String = { if (StringUtils.isNotBlank(dataNode.path(key).asText())) { dataNode.path(key).asText().trim.replace("|", "").replace("\t", "") } else { null } } /* def md5Str(s: String) = { val m = java.security.MessageDigest.getInstance("MD5") val b = s.getBytes("UTF-8") m.update(b,0,b.length) new java.math.BigInteger(1,m.digest()).toString(16) }*/ def md5Str(str: String) = { val hexString = new StringBuffer try { val md = MessageDigest.getInstance("MD5") md.update(str.getBytes()) val hash = md.digest() for (i <- 0 until hash.length) { if ((0xff & hash(i)) < 0x10) { hexString.append("0" + Integer.toHexString((0xFF & hash(i)))); } else { hexString.append(Integer.toHexString(0xFF & hash(i))); } } } catch { case e: NoSuchAlgorithmException => { e.printStackTrace() } } hexString.toString(); } def check_userId(user_id: String): Boolean = { StringUtils.isNotBlank(user_id) && !user_id.equals(allZero) && !user_id.equals("0") && !user_id.equals("nullUser") && !user_id.equals("null") && pattern.matcher(user_id).matches() } object Data extends Enumeration { type Data = Value val VERSION: Data.Value = Value("v") val GAME_ID: Data.Value = Value("game_id") val USER_ID: Data.Value = Value("user_id") val GOOGLE_AID: Data.Value = Value("google_aid") val SDK_VERSION: Data.Value = Value("sdk_version") val OS_VERSION: Data.Value = Value("os_version") val MANUFACTURER: Data.Value = Value("manufacturer") val DEVICE: Data.Value = Value("device") val PLATFORM: Data.Value = Value("platform") val SESSION_ID: Data.Value = Value("session_id") val SESSION_NUM: Data.Value = Value("session_num") val CONNECTION_TYPE: Data.Value = Value("connection_type") val ENGINE_VERSION: Data.Value = Value("engine_version") val BUILD: Data.Value = Value("build") val EVENT_ID: Data.Value = Value("event_id") val CATEGORY: Data.Value = Value("category") val CURRENCY: Data.Value = Value("currency") val AMOUNT: Data.Value = Value("amount") val TRANSACTION_NUM: Data.Value = Value("transaction_num") val CART_TYPE: Data.Value = Value("cart_type") val IOS_IDFV: Data.Value = Value("ios_idfv") val IOS_IDFA: Data.Value = Value("ios_idfa") val VALUE: Data.Value = Value("value") val SCORE: Data.Value = Value("score") val ATTEMPT_NUM: Data.Value = Value("attempt_num") val LENGTH: Data.Value = Value("length") val FACEBOOK_ID: Data.Value = Value("facebook_id") val GOOGLEPLUS_ID: Data.Value = Value("googleplus_id") val IOS_ID: Data.Value = Value("ios_id") val ANDROID_ID: Data.Value = Value("android_id") val LIMIT_AD_TRACKING: Data.Value = Value("limit_ad_tracking") val LOGON_GAMECENTER: Data.Value = Value("logon_gamecenter") val LOGON_GOOGLEPLAY: Data.Value = Value("logon_googleplay") val GENDER: Data.Value = Value("gender") val BIRTH_YEAR: Data.Value = Value("birth_year") val PROGRESSION: Data.Value = Value("progression") val JAILBROKEN: Data.Value = Value("jailbroken") val SEVERITY: Data.Value = Value("severity") } object UserMeta extends Enumeration { type UserMeta = Value val INSTALL_TS: UserMeta.Value = Value("install_ts") val REVENUE: UserMeta.Value = Value("revenue") val RECEIPT_STATUS: UserMeta.Value = Value("receipt_status") val PAY_FT: UserMeta.Value = Value("pay_ft") val SDK_VERSION: UserMeta.Value = Value("sdk_version") val OS_MAJOR: UserMeta.Value = Value("os_major") val DEVICE: UserMeta.Value = Value("device") val PLATFORM: UserMeta.Value = Value("platform") val GENDER: UserMeta.Value = Value("gender") val GOOGLE_AID: UserMeta.Value = Value("google_aid") val ANDROID_ID: UserMeta.Value = Value("android_id") val INSTALL_PUBLISHER: UserMeta.Value = Value("install_publisher") val INSTALL_SITE: UserMeta.Value = Value("install_site") val INSTALL_CAMPAIGN: UserMeta.Value = Value("install_campaign") val INSTALL_ADGROUP: UserMeta.Value = Value("install_adgroup") val INSTALL_AD: UserMeta.Value = Value("install_ad") val INSTALL_KEYWORD: UserMeta.Value = Value("install_keyword") } // ods_ga_active_total val ods_ga_active_total_sql: String = """ |SELECT device_id, platform | FROM dwh.ods_ga_active_total WHERE dt = '@ga_date' """.stripMargin val dm_interest_tag_sql: String = """ |SELECT device_id, platform | FROM dwh.dm_interest_tag_orc WHERE CONCAT(year,month,day) = '@date' AND business = 'ga' """.stripMargin }