package mobvista.dmp.datasource.dm import com.alibaba.fastjson.JSONObject import com.fasterxml.jackson.databind.ObjectMapper import com.google.gson.{JsonArray, JsonObject} import mobvista.dmp.common.MobvistaConstant import mobvista.dmp.util.MRUtils import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.lang3.StringUtils import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StringType, StructField, StructType} import java.text.SimpleDateFormat import java.util import java.util.regex.Pattern import scala.collection.JavaConverters._ import scala.collection.mutable /** * @package: mobvista.dmp.datasource.dm * @author: wangjf * @date: 2018/11/27 * @time: 下午4:45 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ object Constant { val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$" val imeiPtn = "^([0-9]{15,17})$" val andriodIdPtn = "^[a-zA-Z0-9]{16}$" val allZero = "00000000-0000-0000-0000-000000000000" val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") val rexp: String = "^((\\d{2}(([02468][048])|([13579][26]))[\\-\\/\\s]?((((0?[13578])|(1[02]))[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])|(3[01])))|(((0?[469])|(11))[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])|(30)))|(0?2[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])))))|(\\d{2}(([02468][1235679])|([13579][01345789]))[\\-\\/\\s]?((((0?[13578])|(1[02]))[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])|(3[01])))|(((0?[469])|(11))[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])|(30)))|(0?2[\\-\\/\\s]?((0?[1-9])|(1[0-9])|(2[0-8]))))))" val pattern: Pattern = Pattern.compile(rexp) def schema: StructType = { StructType(StructField("device_id", StringType) :: StructField("device_type", StringType) :: StructField("platform", StringType) :: StructField("package_name", StringType) :: StructField("update_date", StringType) :: Nil) } def interest_schema: StructType = { StructType(StructField("device_id", StringType) :: StructField("device_type", StringType) :: StructField("platform", StringType) :: StructField("tags", StringType) :: Nil) } // dm_install_list_sql val dm_install_list_sql: String = s""" |SELECT device_id, device_type, platform, install_list | FROM dwh.dm_install_list | WHERE CONCAT(year,month,day) = '@date' AND business = '@business' """.stripMargin def orcPart(iter: Iterator[Row]): Iterator[Row] = { val res = new util.ArrayList[Row]() while (iter.hasNext) { val row = iter.next val device_id = row.getAs("device_id").toString val device_type = row.getAs("device_type").toString val platform = row.getAs("platform").toString val install_list = row.getAs("install_list").toString val mapper = new ObjectMapper() val jsonNode = mapper.readTree(install_list.toString) if (jsonNode.isArray) { val nodeArray = jsonNode.elements() while (nodeArray.hasNext) { val node = nodeArray.next() val package_name = node.path("package_name").asText val date = node.path("date").asText res.add(Row(device_id, device_type, platform, package_name, date)) } } } import scala.collection.JavaConverters._ res.asScala.iterator } def orcMap(ir: Row): Iterator[Row] = { val res = new util.ArrayList[Row]() val device_id = ir.getAs("device_id").toString val device_type = ir.getAs("device_type").toString val platform = ir.getAs("platform").toString val install_list = ir.getAs("install_list").toString val mapper = new ObjectMapper() val jsonNode = mapper.readTree(install_list.toString).elements() while (jsonNode.hasNext) { val node = jsonNode.next() val package_name = node.path("package_name").asText val date = node.path("date").asText res.add(Row(device_id, device_type, platform, package_name, date)) } import scala.collection.JavaConverters._ res.asScala.iterator } val interest_sql: String = """ |SELECT device_id, MAX(device_type) device_type, MAX(platform) platform, @combineJson(concat_ws(';',collect_set(tags))) tags FROM | (SELECT LOWER(device_id) device_id, device_type, platform, tags | FROM dwh.dm_interest_tag WHERE CONCAT(year, month, day) = '@date' | AND business IN ('adn_install','adn_request_sdk','adn_request_other','dsp_req','other','3s','mp','clever','adn_sdk') AND @check_deviceId | UNION ALL | SELECT LOWER(device_id) device_id, device_type, platform, tags | FROM dwh.dm_interest_tag WHERE CONCAT(year, month, day) = '@ga_date' AND business IN ('ga_device','ga') AND @check_deviceId | ) interest_tag GROUP BY device_id """.stripMargin val old_interest_sql: String = """ |SELECT device_id, device_type, platform, @combineJson(concat_ws(';',collect_set(tags))) tags FROM | (SELECT device_id device_id, device_type, platform, tags | FROM dwh.dm_interest_tag WHERE CONCAT(year, month, day) = '@date' | AND business IN ('adn_install','adn_request_sdk','adn_request_other','dsp_req','3s','mp','clever','adn_sdk') AND @check_deviceId | UNION ALL | SELECT device_id device_id, device_type, platform, tags | FROM dwh.dm_interest_tag WHERE CONCAT(year, month, day) = '@ga_date' AND business IN ('ga_device','ga','other') AND @check_deviceId | ) interest_tag | GROUP BY device_id, device_type, platform """.stripMargin val interest_tag_sql: String = """ |SELECT LOWER(device_id) device_id, device_type, platform, tags FROM | dev.dm_interest_tag WHERE CONCAT(year,month,day) = '@date' AND business = '3s' """.stripMargin // ALTER TABLE dev.dm_interest_tag ADD IF NOT EXISTS PARTITION (year = '2019', month = '04', day = '12', business = '3s') LOCATION 's3://mob-emr-test/wangjf/dm_interest_tag/2019/04/09/all'; val device_tag_sql: String = """ |SELECT /*+ MAPJOIN(old2new) */ device_id, MAX(device_type) device_type, MAX(platform) platform, package_name, MAX(update_date) update_date, | COLLECT_SET(@str2Json(old2new.tag_id, device_tag.first_tag, device_tag.second_tag)) tags | FROM dwh.dm_old2new_tag old2new | JOIN | (SELECT LOWER(device_id) device_id, device_type, platform, package_name, update_date, tag_type, first_tag, second_tag | FROM dwh.dm_device_tag WHERE dt = '@date' AND business NOT IN ('ga','other') AND @check_deviceId | UNION ALL | SELECT LOWER(device_id) device_id, device_type, platform, package_name, update_date, tag_type, first_tag, second_tag | FROM dwh.dm_device_tag WHERE dt = '@ga_date' AND business IN ('ga','other') | ) device_tag | ON LOWER(CONCAT(old2new.tag_type, '-', old2new.first_tag, '-', old2new.second_tag)) = LOWER(CONCAT(device_tag.tag_type, '-', device_tag.first_tag, '-', device_tag.second_tag)) | GROUP BY device_id, package_name """.stripMargin val device_tag_sql_1: String = """ |SELECT /*+ MAPJOIN(old2new) */ device_id, MAX(device_type) device_type, MAX(platform) platform, package_name, MAX(update_date) update_date, | CAST(COLLECT_SET(@str2Json(old2new.tag_id, device_tag.first_tag, device_tag.second_tag)) AS string) tags | FROM dwh.dm_old2new_tag old2new | JOIN | (SELECT LOWER(device_id) device_id, device_type, platform, package_name, update_date, tag_type, first_tag, second_tag | FROM dwh.dm_device_tag WHERE dt = '@date' AND business = 'clever' | ) device_tag | ON LOWER(CONCAT(old2new.tag_type, '-', old2new.first_tag, '-', old2new.second_tag)) = LOWER(CONCAT(device_tag.tag_type, '-', device_tag.first_tag, '-', device_tag.second_tag)) | GROUP BY device_id, package_name """.stripMargin val device_tag_sql_2: String = """ |SELECT LOWER(device_id) device_id, device_type, platform, package_name, update_date, tag_type, first_tag, second_tag | FROM dwh.dm_device_tag WHERE dt = '@date' AND business = '3s' """.stripMargin def dm_device_tag_schema: StructType = { StructType(StructField("device_id", StringType) :: StructField("device_type", StringType) :: StructField("platform", StringType) :: StructField("package_name", StringType) :: StructField("update_date", StringType) :: StructField("tag_type", StringType) :: StructField("first_tag", StringType) :: StructField("second_tag", StringType) :: Nil) } // 替换 group by device_id, package_name def packageMapPartition(iter: Iterator[DeviceTag], bMap: Broadcast[scala.collection.Map[String, String]]): Iterator[((String, String), (String, String, String, String))] = { val res = new util.ArrayList[((String, String), (String, String, String, String))]() while (iter.hasNext) { val row = iter.next val device_id = row.device_id val device_type = row.device_type val platform = row.platform val package_name = row.package_name val tag_type = row.tag_type val first_tag = row.first_tag val second_tag = row.second_tag val tag_code = tag_type + "-" + first_tag + "-" + second_tag val tag_id = if (bMap.value.contains(tag_code.toUpperCase)) { bMap.value(tag_code.toUpperCase) } else { "" } val tagJson = str2Json(tag_id, first_tag, second_tag) val update_date = row.update_date res.add(((device_id.toLowerCase, package_name), (device_type, platform, tagJson, update_date))) } res.asScala.iterator } def groupByPackagePartition(iter: Iterator[((String, String), Iterable[(String, String, String, String)])]): Iterator[(String, (String, String))] = { val res = new util.ArrayList[(String, (String, String))]() while (iter.hasNext) { val ir = iter.next val device_id = ir._1._1 val package_name = ir._1._2 var device_type = "" var platform = "" var update_date = "" val tagSet = new util.HashSet[String]() val values = ir._2.iterator while (values.hasNext) { val vs = values.next if (StringUtils.isBlank(device_type)) { device_type = vs._1 } if (StringUtils.isBlank(platform)) { platform = vs._2 } if (StringUtils.isNotBlank(update_date)) { if (sdf.parse(update_date).before(sdf.parse(vs._4))) { update_date = vs._4 } } else { update_date = vs._4 } tagSet.add(vs._3) } res.add((device_id, ("new", MRUtils.JOINER.join(device_type, platform, package_name, update_date, tagSet.toString)))) } res.asScala.iterator } def bigJoinMap(iter: (String, Iterable[(String, String)]), bMap: Broadcast[scala.collection.Map[String, String]]): DmInterestTag = { val device_id = iter._1 var device_type_new = "" var platform_new = "" val oldMap: java.util.HashMap[String, (String, String)] = new util.HashMap[String, (String, String)]() val newMap: java.util.HashMap[String, (String, util.Set[String])] = new util.HashMap[String, (String, util.Set[String])]() val resultMap: java.util.HashMap[String, (String, String, util.HashSet[String])] = new util.HashMap[String, (String, String, util.HashSet[String])]() val rs = iter._2.iterator val jsonArray = new JsonArray val tmpJsonArray = new JsonArray while (rs.hasNext) { val r = rs.next if (r._1.equals("new")) { val array = MRUtils.SPLITTER.split(r._2) val package_name = array(0) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(1) } if (StringUtils.isBlank(platform_new)) { platform_new = array(2) } val tag_type = array(3) val first_tag = array(4) val second_tag = array(5) val update_date = array(6) val tag_code = tag_type + "-" + first_tag + "-" + second_tag val tag_id = bMap.value(tag_code) val tagJson = str2Json(tag_id, first_tag, second_tag) if (resultMap.keySet.contains(package_name)) { val values = resultMap.get(package_name) val tag_new = values._3 tag_new.add(tagJson) resultMap.put(package_name, (values._1, values._2, tag_new)) } else { val tag_new = new util.HashSet[String]() tag_new.add(tagJson) resultMap.put(package_name, (update_date, "", tag_new)) } } if (r._1.equals("old")) { val array = MRUtils.SPLITTER.split(r._2) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(0) } if (StringUtils.isBlank(platform_new)) { platform_new = array(1) } val jsonNode = GsonUtil.String2JsonArray(array(2)) for (i <- 0 until jsonNode.size) { val json = jsonNode.get(i).getAsJsonObject if (!json.has("package_name") || !json.has("date")) { tmpJsonArray.add(json) } else if (resultMap.containsKey(json.get("package_name").getAsString)) { val values = resultMap.get(json.get("package_name").getAsString) resultMap.put(json.get("package_name").getAsString, (values._1, json.get("tag").toString, values._3)) } else { resultMap.put(json.get("package_name").getAsString, (json.get("date").getAsString, json.get("tag").toString, new util.HashSet[String]())) } } } } import scala.collection.JavaConversions._ resultMap.foreach(rs => { val jsonObject = new JsonObject jsonObject.addProperty("package_name", rs._1) if (StringUtils.isNotBlank(rs._2._1)) { jsonObject.addProperty("date", rs._2._1) } if (StringUtils.isNotBlank(rs._2._2)) { jsonObject.add("tag", GsonUtil.String2JsonArray(rs._2._2)) } if (rs._2._3.size() > 0) { jsonObject.add("tag_new", GsonUtil.toJsonTree(rs._2._3)) } jsonArray.add(jsonObject) }) if (tmpJsonArray.size > 0) { jsonArray.addAll(tmpJsonArray) } DmInterestTag(device_id, device_type_new, platform_new, jsonArray.toString) } def bigJoinMapPart(iter: Iterator[(String, Iterable[(String, String)])]): Iterator[DmInterestTag] = { val res = new util.ArrayList[DmInterestTag]() while (iter.hasNext) { val ir = iter.next val device_id = ir._1 var device_type_new = "" var platform_new = "" val resultMap: java.util.Map[String, (String, String, String)] = new util.HashMap[String, (String, String, String)]() val rs = ir._2.iterator val jsonArray = new JsonArray val tmpJsonArray = new JsonArray while (rs.hasNext) { val r = rs.next if (r._1.equals("new")) { val array = MRUtils.SPLITTER.split(r._2) val package_name = array(0) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(1) } if (StringUtils.isBlank(platform_new)) { platform_new = array(2) } val update_date = array(3) val tags = array(4) if (resultMap.containsKey(package_name)) { val values = resultMap.get(package_name) resultMap.put(package_name, (values._1, values._2, tags)) } else { if (pattern.matcher(update_date).matches()) { resultMap.put(package_name, (update_date, "", tags)) } else { resultMap.put(package_name, ("", "", tags)) } } } if (r._1.equals("old")) { val array = MRUtils.SPLITTER.split(r._2) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(0) } if (StringUtils.isBlank(platform_new)) { platform_new = array(1) } val jsonNode = GsonUtil.String2JsonArray(array(2)) for (i <- 0 until jsonNode.size) { val json = jsonNode.get(i).getAsJsonObject if (!json.has("package_name") || !json.has("date")) { tmpJsonArray.add(json) } else { if (resultMap.containsKey(json.get("package_name").getAsString)) { val values = resultMap.get(json.get("package_name").getAsString) resultMap.put(json.get("package_name").getAsString, (values._1, json.get("tag").toString, values._3)) /* if (StringUtils.isNotBlank(json.get("date").getAsString) && StringUtils.isNotBlank(values._1) && sdf.parse(json.get("date").getAsString).after(sdf.parse(values._1))) { resultMap.put(json.get("package_name").getAsString, (json.get("date").getAsString, json.get("tag").toString, values._3)) } else { resultMap.put(json.get("package_name").getAsString, (values._1, json.get("tag").toString, values._3)) } */ } else { if (pattern.matcher(json.get("date").getAsString).matches()) { resultMap.put(json.get("package_name").getAsString, (json.get("date").getAsString, json.get("tag").toString, "")) } else { resultMap.put(json.get("package_name").getAsString, ("", json.get("tag").toString, "")) } } } } } } import scala.collection.JavaConversions._ resultMap.foreach(rs => { val jsonObject = new JsonObject jsonObject.addProperty("package_name", rs._1) if (StringUtils.isNotBlank(rs._2._1)) { jsonObject.addProperty("date", rs._2._1) } if (StringUtils.isNotBlank(rs._2._2)) { jsonObject.add("tag", GsonUtil.String2JsonArray(rs._2._2)) } if (StringUtils.isNotBlank(rs._2._3)) { jsonObject.add("tag_new", GsonUtil.String2JsonArray(rs._2._3)) } jsonArray.add(jsonObject) }) if (tmpJsonArray.size > 0) { jsonArray.addAll(tmpJsonArray) } res.add(DmInterestTag(device_id, device_type_new, platform_new, jsonArray.toString)) } import scala.collection.JavaConverters._ res.asScala.iterator } def mapPart(ir: (String, Iterable[(String, String)])): DmInterestTag = { val device_id = ir._1 var device_type_new = "" var platform_new = "" val resultMap: java.util.Map[String, (String, String, String)] = new util.HashMap[String, (String, String, String)]() val rs = ir._2.iterator val jsonArray = new JsonArray val tmpJsonArray = new JsonArray while (rs.hasNext) { val r = rs.next if (r._1.equals("new")) { val array = MRUtils.SPLITTER.split(r._2) val package_name = array(0) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(1) } if (StringUtils.isBlank(platform_new)) { platform_new = array(2) } val update_date = array(3) val tags = array(4) if (resultMap.containsKey(package_name)) { val values = resultMap.get(package_name) resultMap.put(package_name, (values._1, values._2, tags)) } else { if (pattern.matcher(update_date).matches()) { resultMap.put(package_name, (update_date, "", tags)) } else { resultMap.put(package_name, ("", "", tags)) } } } if (r._1.equals("old")) { val array = MRUtils.SPLITTER.split(r._2) if (StringUtils.isBlank(device_type_new)) { device_type_new = array(0) } if (StringUtils.isBlank(platform_new)) { platform_new = array(1) } val jsonNode = GsonUtil.String2JsonArray(array(2)) for (i <- 0 until jsonNode.size) { val json = jsonNode.get(i).getAsJsonObject if (!json.has("package_name") || !json.has("date")) { tmpJsonArray.add(json) } else { if (resultMap.containsKey(json.get("package_name").getAsString)) { val values = resultMap.get(json.get("package_name").getAsString) resultMap.put(json.get("package_name").getAsString, (values._1, json.get("tag").toString, values._3)) } else { if (pattern.matcher(json.get("date").getAsString).matches()) { resultMap.put(json.get("package_name").getAsString, (json.get("date").getAsString, json.get("tag").toString, "")) } else { resultMap.put(json.get("package_name").getAsString, ("", json.get("tag").toString, "")) } } } } } } import scala.collection.JavaConversions._ resultMap.foreach(rs => { val jsonObject = new JsonObject jsonObject.addProperty("package_name", rs._1) if (StringUtils.isNotBlank(rs._2._1)) { jsonObject.addProperty("date", rs._2._1) } if (StringUtils.isNotBlank(rs._2._2)) { jsonObject.add("tag", GsonUtil.String2JsonArray(rs._2._2)) } if (StringUtils.isNotBlank(rs._2._3)) { jsonObject.add("tag_new", GsonUtil.String2JsonArray(rs._2._3)) } jsonArray.add(jsonObject) }) if (tmpJsonArray.size > 0) { jsonArray.addAll(tmpJsonArray) } DmInterestTag(device_id, device_type_new, platform_new, jsonArray.toString) } def str2Json(tag_id: String, first_tag: String, second_tag: String): String = { val jsonObject = new JsonObject jsonObject.addProperty("id", tag_id) jsonObject.addProperty("1", first_tag) if (StringUtils.isNotBlank(second_tag)) { jsonObject.addProperty("2", second_tag) } jsonObject.toString } val old2new_sql: String = """ |SELECT UPPER(CONCAT(tag_type, '-', first_tag, '-', second_tag)) tag_code, tag_id FROM dwh.dm_old2new_tag """.stripMargin val test_interest_sql: String = """ |SELECT device_id, device_type, platform, @combineJson(concat_ws(';',collect_set(tags))) tags | FROM dm_interest_tag WHERE @check_deviceId | GROUP BY device_id, device_type, platform """.stripMargin val install_v2_sql: String = """ |SELECT /*+ mapjoin(t)*/ device_id, device_type, platform, collect_set(@combineJson(d.package_name, update_date, tag)) as tags | FROM | (SELECT package_name, tag FROM dwh.dim_app_tag WHERE CONCAT(year, month, day) = '@date') t | JOIN | (SELECT device_id, device_type, platform, package_name, update_date | FROM dev.dm_install_list_v2 | WHERE dt = '@date' AND business = '@business' AND @check_deviceId | ) d ON d.package_name = t.package_name | GROUP BY device_id, device_type, platform """.stripMargin // 避免不同 business 中update_date 不同时 val install_sql: String = """ |SELECT device_id, device_type, platform, install_list, ext_data, update_date | FROM dwh.dmp_install_list WHERE dt = '@date' AND business = '14days' """.stripMargin val dim_app_tag_sql: String = """ |SELECT package_name, tag FROM dwh.dim_app_tag WHERE CONCAT(year, month, day) = '@date' """.stripMargin val test_install_sql = s""" |SELECT device_id, device_type, platform,@combineJsonArray(concat_ws(';',collect_set(tag))) tags FROM | (SELECT /*+ mapjoin(t)*/ device_id, device_type, platform, @col_json(d.package_name, update_date, tag) tag | FROM | (SELECT package_name, tag FROM dwh.dim_app_tag WHERE CONCAT(year, month, day) = '@date') t | JOIN | (SELECT device_id, device_type, platform, package_name, update_date | FROM dwh.dm_install_list_v2 | WHERE dt = '@date' AND business = '@business' AND @check_deviceId | ) d ON d.package_name = t.package_name | ) device_tag |GROUP BY device_id, device_type, platform """.stripMargin def check_deviceId(device_id: String): Boolean = { // device_id.matches(didPtn) && !device_id.equals(allZero) StringUtils.isNotBlank(device_id) && (device_id.matches(didPtn) && !device_id.equals(allZero) || device_id.matches(imeiPtn) || device_id.matches(andriodIdPtn)) } val dm_device_dag_statistics_sql: String = """ |SELECT UPPER(device_id) device_id,caculateTagCount(CONCAT_WS(',',COLLECT_LIST(tags))) tags | FROM dwh.dmp_device_tag t | LEFT SEMI JOIN active s | ON UPPER(t.device_id) = s.device_id | WHERE dt = '@date' | GROUP BY UPPER(device_id) """.stripMargin val dmp_device_tag_statistics_sql: String = """ |SELECT device_id, caculateTagCnt(CONCAT_WS(',',COLLECT_LIST(interest_tag))) tags | FROM dwh.dmp_device_tag_daily WHERE dt = '@date' AND business = '14days' | GROUP BY device_id |""".stripMargin val dm_device_dag_statistics_v2_sql: String = """ |SELECT UPPER(device_id) device_id, getId(UPPER(CONCAT(tag_type,'-',first_tag,'-',second_tag))) tag_code, update_date | FROM dwh.dm_device_tag | WHERE dt = '@date' AND business NOT IN ('ga','other') |UNION |SELECT UPPER(device_id) device_id, getId(UPPER(CONCAT(tag_type,'-',first_tag,'-',second_tag))) tag_code, update_date | FROM dwh.dm_device_tag | WHERE dt = '@yestoday' AND business IN ('ga','other') """.stripMargin val week_active_sql: String = s""" |select t.device_id |from ( | select upper(device_id) device_id | from dwh.ods_dmp_user_info | where dt = '@date' and business <> 'ga' and last_req_day > '@lastReq1' | union all | select upper(device_id) device_id | from dwh.ods_dmp_user_info | where dt = '@yestoday' and business = 'ga' and last_req_day > '@lastReq2' |) t |group by t.device_id """.stripMargin def statistic_schema: StructType = { StructType(StructField("device_id", StringType) :: StructField("tag_type", StringType) :: StructField("first_tag", StringType) :: StructField("second_tag", StringType) :: StructField("cnt", StringType) :: Nil) } def col_json(package_name: String, date: String, tag: String): String = { val json = new JsonObject json.addProperty("package_name", package_name) json.addProperty("date", date) json.add("tag", GsonUtil.String2JsonArray(tag)) json.toString } def combineJson(tags: String): String = { val list: util.List[JsonObject] = new util.ArrayList[JsonObject] val map: java.util.Map[String, (Integer, Integer)] = new util.HashMap[String, (Integer, Integer)]() tags.split("#").foreach(tag => { val array = tag.split(";") val json = new JsonObject() json.addProperty("tag_id", array(0)) json.addProperty("cnt", array(1).toInt) json.addProperty("count", array(2).toInt) list.add(json) }) import java.util.{Collections, Comparator} Collections.sort(list, new Comparator[JsonObject]() { override def compare(o1: JsonObject, o2: JsonObject): Int = { val a1 = o1.get("count").getAsInt val a2 = o2.get("count").getAsInt if (a1 > a2) { -1 } else if (a1 == a2) { val b1 = o2.get("cnt").getAsInt val b2 = o2.get("cnt").getAsInt if (b1 > b2) -1 else if (b1 == b2) 0 else 1 } else 1 } }) list.toString } val active_sql: String = s""" |SELECT device_id, MAX(device_type) device_type, CASE WHEN lower(MAX(device_type)) = 'idfa' THEN 0 ELSE 1 END AS platform, | @combineJson(concat_ws('#', collect_set(concat(tag_code, ';', cnt, ';', count)))) tags | FROM | (SELECT lower(device_id) device_id, MAX(device_type) device_type, tag_code, COUNT(DISTINCT dt) cnt, COUNT(1) `count` | FROM dwh.dm_device_tag_daily | WHERE dt BETWEEN '@before_days' AND '@date' AND @check_deviceId | GROUP BY lower(device_id), tag_code | ) |GROUP BY device_id """.stripMargin val user_info_sql: String = s""" |SELECT LOWER(device_id) device_id, MAX(country) country_code | FROM dwh.ods_dmp_user_info WHERE dt = '@date' AND last_req_day BETWEEN '@before_days2' AND '@date2' | GROUP BY LOWER(device_id) """.stripMargin val region_sql: String = """ |SELECT LOWER(device_id) device_id, MAX(region) region | FROM dev.dm_device_region WHERE dt = '@date' | GROUP BY LOWER(device_id) """.stripMargin val active_join_info_sql: String = """ |SELECT a.device_id, a.device_type, a.platform, COALESCE(b.country_code,'') country_code, COALESCE(c.region,'') region, a.tags | FROM active a | LEFT JOIN info b ON a.device_id = b.device_id | LEFT JOIN region c ON a.device_id = c.device_id """.stripMargin val query_active_sql: String = s""" |SELECT device_id, tag_code | FROM dev.dm_active_tag | WHERE dt = '@dt' AND platform = '@platform' AND cnt >= @cnt """.stripMargin case class DmInterestTagV2(device_id: String, device_type: String, platform: String, install: String, tags: String, ext_data: String, update_date: String) extends java.io.Serializable def mergeExtData(ext_datas: mutable.WrappedArray[String]) = { val businessSet = Set("adn_request_sdk", "adn_request_unmatch", "dsp_req", "dsp_req_unmatch") val extJSONObject = new JSONObject() val map = new mutable.HashMap[String, (String, String)]() ext_datas.filter(ss => { businessSet.contains(ss.split("#")(2)) }).foreach(ext_data => { val arr = ext_data.split("#") val ext = arr(0) val update_date = arr(1) val business = if (arr(2).equals("adn_request_sdk") || arr(2).equals("adn_request_unmatch")) { "m" } else { "dsp" } if (map.contains(business)) { val tp = map(business) var updateDate = tp._1 if (update_date.compareTo(updateDate) > 0) { updateDate = update_date } map.put(business, (updateDate, merge2Data(MobvistaConstant.String2JSONObject(tp._2), MobvistaConstant.String2JSONObject(ext)))) } else { map.put(business, (update_date, ext)) } }) map.foreach(kv => { val json = MobvistaConstant.String2JSONObject(kv._2._2) json.put("last_date", kv._2._1) extJSONObject.put(kv._1, json) }) extJSONObject.toJSONString } def merge2Data(data_1: JSONObject, data_2: JSONObject): String = { val mergeExtData = new JSONObject() var dev_tag = 0 if (data_1.containsKey("dev_tag")) { dev_tag = data_1.getIntValue("dev_tag") } if (data_2.containsKey("dev_tag") && dev_tag != 1) { dev_tag = data_2.getIntValue("dev_tag") } mergeExtData.put("dev_tag", dev_tag) val regionSet = new mutable.HashSet[String]() import scala.collection.JavaConversions._ if (data_1.containsKey("region")) { data_1.getJSONArray("region").filter(r => { StringUtils.isNotBlank(r.toString) }).foreach(v => { regionSet.add(v.toString) }) } if (data_2.containsKey("region")) { data_2.getJSONArray("region").filter(r => { StringUtils.isNotBlank(r.toString) }).foreach(v => { regionSet.add(v.toString) }) } if (regionSet.nonEmpty) { mergeExtData.put("region", regionSet.asJava) } val strategySet = new mutable.HashSet[String]() import scala.collection.JavaConversions._ if (data_1.containsKey("strategy")) { data_1.getJSONArray("strategy").foreach(v => { strategySet.add(v.toString) }) } if (data_2.containsKey("strategy")) { data_2.getJSONArray("strategy").foreach(v => { strategySet.add(v.toString) }) } if (strategySet.nonEmpty) { mergeExtData.put("strategy", strategySet.asJava) } mergeExtData.toJSONString } }