package mobvista.dmp.datasource.device import java.util import mobvista.dmp.datasource.age.mapreduce.ExtractDeviceMould import mobvista.dmp.util.MRUtils import org.apache.commons.lang.StringUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.codehaus.jackson.`type`.JavaType import org.codehaus.jackson.map.ObjectMapper import scala.collection.mutable /** * @package: mobvista.dmp.datasource.device * @author: wangjf * @create: 2018-08-28 18:43 **/ object OdsDmpUserInfoLogic { } class OdsDmpUserInfoLogic(val date: String) extends Serializable { // 解析 gender_sql def getGenderSql(): String = { Constant.dm_device_gender_sql.replace("@year", date.substring(0, 4)) .replace("@month", date.substring(5, 7)).replace("@day", date.substring(8, 10)).replace("@update_date", date) } // 解析 age_sql def getAgeSql(): String = { Constant.dm_device_age_sql.replace("@year", date.substring(0, 4)) .replace("@month", date.substring(5, 7)).replace("@day", date.substring(8, 10)).replace("@update_date", date) } // 解析 install_list_sql def getDmInstallListSql(business: String): String = { Constant.dm_install_list_sql.replace("@year", date.substring(0, 4)) .replace("@month", date.substring(5, 7)).replace("@day", date.substring(8, 10)).replace("@business", business) } // 解析 user_info_sql def getUserInfoSql(): String = { Constant.user_info_sql.replace("@date", date) } // 生成最近活跃设备的历史所有 package_names def getNewInstallList(rdd: RDD[Row]): RDD[String] = { rdd.map(row => InstallList(row.getAs("device_id"), row.getAs("device_type"), row.getAs("install_list"))) .filter(installList => { val device_id = installList.device_id val package_name = installList.device_type check_deviceId(device_id) && StringUtils.isNotBlank(package_name) }).map(installList => { val device_id = installList.device_id val device_type = installList.device_type val package_name = installList.install_list val key = device_id val value = MRUtils.JOINER.join(package_name, device_type) (key, value) }).reduceByKey(_ + ";" + _) .map(row => { val install_list = new mutable.ArrayBuffer[String]() val objectMapper: ObjectMapper = new ObjectMapper val javaType: JavaType = objectMapper.getTypeFactory.constructCollectionType(classOf[util.ArrayList[_]], classOf[ExtractDeviceMould]) val device_id = row._1 var device_type = "" var new_date = "" val package_names: StringBuilder = new StringBuilder() val array = row._2.split(";") for (iter <- array) { val arr = MRUtils.SPLITTER.split(iter, -1) val packageList: util.ArrayList[ExtractDeviceMould] = objectMapper.readValue(arr(0), javaType) for (i <- 0 until packageList.size) { val pkg: ExtractDeviceMould = packageList.get(i) if (StringUtils.isNotBlank(pkg.getPackage_name)) { if (!StringUtils.isNotBlank(new_date) && StringUtils.isNotBlank(pkg.getDate)) { new_date = pkg.getDate } else { try if (Constant.format.parse(pkg.getDate).after(Constant.format.parse(new_date))) { new_date = pkg.getDate } catch { case _: Exception => } } device_type = arr(1) package_names.append("#") package_names.append(pkg.getPackage_name) } } } if (new_date.equals(date)) { install_list += MRUtils.JOINER.join(device_id, device_type, package_names.substring(1)) } install_list }).filter(r => { r.nonEmpty }).map(_.mkString) } def check_deviceId(device_id: String): Boolean = { StringUtils.isNotBlank(device_id) && Constant.lineSplit.split(device_id, -1).length == 5 && !Constant.`match`.matcher(device_id) .matches() } }