package mobvista.dmp.datasource.retargeting // import com.datastax.spark.connector._ import com.alibaba.fastjson.{JSON, JSONObject} import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.DateUtil import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{SaveMode, SparkSession} import java.net.URI import java.util import scala.collection.JavaConverters._ import scala.collection.mutable /** * @package: mobvista.dmp.datasource.retargeting * @author: wangjf * @date: 2019/5/23 * @time: 下午4:41 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class DeviceInfoJob extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } import java.text.SimpleDateFormat val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") var bMap: Broadcast[scala.collection.Map[String, String]] = null var packageMap: Broadcast[scala.collection.Map[String, Int]] = null override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val spark = SparkSession .builder() .appName("DeviceInfoJob") .config("spark.rdd.compress", "true") .config("spark.shuffle.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "lz4") .config("spark.io.compression.lz4.blockSize", "64k") .config("spark.sql.autoBroadcastJoinThreshold", "314572800") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) val last_req_day = DateUtil.getDayByString(date, "yyyyMMdd", -13) val update_date = sdf1.format(sdf2.parse(last_req_day)) val sc = spark.sparkContext val code_sql = Constant.old2new_sql bMap = sc.broadcast(spark.sql(code_sql).rdd.map(r => { (r.getAs("tag_code").toString, r.getAs("new_second_id").toString) }).collectAsMap()) println("bMap.size ===>>> " + bMap.value.size) val map = sc.broadcast(spark.sql(Constant.second2first_sql).rdd.map(r => { (r.getAs("new_second_id").toString, r.getAs("new_first_id").toString) }).collectAsMap()) println("map.size ===>>> " + map.value.size) var package_sql = """ |SHOW PARTITIONS dwh.package_mapping """.stripMargin var partDF = spark.sql(package_sql) val package_dt = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1) package_sql = s""" |SELECT id, package_name FROM dwh.package_mapping WHERE dt = '${package_dt}' """.stripMargin packageMap = spark.sparkContext.broadcast(spark.sql(package_sql).rdd.map(r => { (r.getAs("package_name").toString.toLowerCase, Integer.parseInt(r.getAs("id").toString)) }).collectAsMap()) /* packageMap = sc.broadcast(Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd.map(r => { (r.getAs("app_package_name").toString, Integer.parseInt(r.getAs("id").toString)) }).collectAsMap()) */ spark.udf.register("getId", getId _) spark.udf.register("getInstallList", getInstallList _) spark.udf.register("getInterestList", getInterestList _) spark.udf.register("getFrenquency", getFrenquency _) spark.udf.register("checkDevice", Constant.checkDevice _) val statistics_sql = """ |SHOW PARTITIONS dwh.dmp_device_tag_statistics """.stripMargin partDF = spark.sql(statistics_sql) val freqDate = partDF.orderBy(partDF("partition").desc).first.getString(0).split("=")(1) val freqSql = Constant.statistics_sql.replace("@date", freqDate) spark.sql(freqSql).createOrReplaceTempView("dm_active") val active_sql = """ |SHOW PARTITIONS dwh.dm_active_tag """.stripMargin partDF = spark.sql(active_sql) var activeDate = partDF.orderBy(partDF("partition").desc).take(2)(1).getString(0).split("/")(0).split("=")(1) val part = partDF.orderBy(partDF("partition").desc).take(2)(1).getString(0).split("/")(1).split("=")(1) if (!part.equals("month")) { activeDate = DateUtil.getDayByString(activeDate, "yyyyMMdd", -1) } // .replace("@last_req_day", last_req_day) val sql = Constant.ods_dmp_user_info_all_sql_distinct .replaceAll("@date", date) .replaceAll("@activeDate", activeDate) .replaceAll("@update_date", update_date) import spark.implicits._ spark.sql(sql) .rdd.map(r => { import scala.collection.JavaConversions._ val interest: mutable.HashSet[String] = new mutable.HashSet[String]() val freObject = if (r.getAs("frequency") != null && StringUtils.isNotBlank(r.getAs("frequency").toString)) { JSON.parseObject(r.getAs("frequency").toString) } else { new JSONObject() } freObject.keySet().foreach(key => { interest.add(map.value(key)) interest.add(key) }) /* val frequencyEntity = FrequencyEntity(r.getAs("frequency")) if (frequencyEntity.frequency != null && frequencyEntity.frequency.nonEmpty) { for (i <- frequencyEntity.frequency.indices) { val tag = frequencyEntity.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("tag").toString val cnt = Integer.parseInt(frequencyEntity.frequency.get(i).asInstanceOf[GenericRowWithSchema].getAs("cnt").toString) jsonObject.put(tag, cnt) interest.add(map.value(tag)) interest.add(tag) } } */ val interestArr = r.getAs("interest").asInstanceOf[mutable.WrappedArray[String]] interestArr.foreach(i => { interest.add(map.value(i)) interest.add(i) }) val tag_week_jsonObject = new JSONObject() if (r.getAs("tag_week") != null && StringUtils.isNotBlank(r.getAs("tag_week"))) { val jsonArray = GsonUtil.String2JsonArray(r.getAs("tag_week")) for (json <- jsonArray) { val j = json.getAsJsonObject val tag_id = j.get("tag_id").getAsString val cntJson = new JSONObject() val cnt = j.get("cnt").getAsInt cntJson.put("cnt", cnt) val count = j.get("count").getAsInt cntJson.put("count", count) tag_week_jsonObject.put(tag_id, cntJson) interest.add(map.value(tag_id)) interest.add(tag_id) } } val tag_month_jsonObject = new JSONObject() if (r.getAs("tag_month") != null && StringUtils.isNotBlank(r.getAs("tag_month"))) { val jsonArray = GsonUtil.String2JsonArray(r.getAs("tag_month")) for (json <- jsonArray) { val j = json.getAsJsonObject val tag_id = j.get("tag_id").getAsString val cntJson = new JSONObject() val cnt = j.get("cnt").getAsInt cntJson.put("cnt", cnt) val count = j.get("count").getAsInt cntJson.put("count", count) tag_month_jsonObject.put(tag_id, cntJson) interest.add(map.value(tag_id)) interest.add(tag_id) } } DeviceInfoEntity(r.getAs("device_id").toString.toUpperCase, r.getAs("platform"), r.getAs("model"), r.getAs("os_version"), r.getAs("country"), r.getAs("age"), r.getAs("gender"), r.getAs("install"), mutable.WrappedArray.make(interest.toArray[String]), r.getAs("behavior"), freObject.toJSONString, tag_week_jsonObject.toJSONString, tag_month_jsonObject.toJSONString, r.getAs("region"), r.getAs("update_date"), r.getAs("publish_date")) }).toDF .repartition(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "snappy") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } // 生成 Set[inter_id] def getInterestList(interest: String): Array[String] = { val set = new util.HashSet[String]() interest.split("#").foreach(inters => { val ins = inters.toUpperCase.split(",") if (ins.length >= 3) { val key = ins(0) + "-" + ins(1) + "-" + ins(2) val vals = if (bMap.value.keySet.contains(key)) { bMap.value(key) } else { bMap.value.getOrElse(key + "OTHER", "") } if (StringUtils.isNotBlank(vals)) { set.add(vals) } } }) set.asScala.toArray } def getId(tag_code: String): String = { val id = if (bMap.value.keySet.contains(tag_code.toUpperCase)) { bMap.value(tag_code.toUpperCase) } else { bMap.value.getOrElse(tag_code.toUpperCase + "OTHER", "") } id } // 生成 Set[package_name] def getInstallList(install: String): Array[Int] = { val set = new util.HashSet[Int]() if (StringUtils.isNotBlank(install)) { install.split(",").foreach(pkgs => { val pkd = pkgs.split("\\|") if (pkd.nonEmpty && StringUtils.isNotBlank(pkd(0)) && packageMap.value.contains(pkd(0).toLowerCase) ) { set.add(packageMap.value(pkd(0).toLowerCase)) } }) } set.asScala.toArray } def getFrenquency(frenquency: String): Array[(String, Int)] = { val set = new util.HashSet[(String, Int)]() frenquency.split(",").foreach(fn => { val fns = fn.split(":") if (StringUtils.isNotBlank(fns(0))) { set.add(fns(0), fns(1).toInt) } }) set.asScala.toArray } } object DeviceInfoJob { def main(args: Array[String]): Unit = { new DeviceInfoJob().run(args) } }