package mobvista.dmp.datasource.dm import java.net.URI import java.text.SimpleDateFormat import java.util import java.util.Date import com.google.gson.{JsonArray, JsonObject} import mobvista.dmp.common.CommonSparkJob import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} /** * @author wangjf */ class DmInterestAll extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) printOptions(commandLine) return 1 } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val ga_date = commandLine.getOptionValue("ga_date") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val spark = SparkSession.builder() .appName("DmInterestAll") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) try { spark.udf.register("check_deviceId", Constant.check_deviceId _) spark.udf.register("combineJson", combineJsonArray _) /* val interest_sql = Constant.interest_sql.replace("@date", date).replace("@ga_date", ga_date) .replace("@check_deviceId", "check_deviceId(device_id)").replace("@combineJson", "combineJson") */ val interest_sql = Constant.old_interest_sql.replace("@date", date).replace("@ga_date", ga_date) .replace("@check_deviceId", "check_deviceId(device_id)").replace("@combineJson", "combineJson") val df = spark.sql(interest_sql).repartition(coalesce.toInt) df.toDF .write.mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } /** * @desc * UDF 合并 jsonArray 并,将 date 置为最新的日期,减少 shuffle 操作和磁盘 io * @param tags * jsonArray * @return */ def combineJsonArray(tags: String): String = { val jsonArray = new JsonArray val map: java.util.Map[String, (Date, JsonObject)] = new util.HashMap[String, (Date, JsonObject)]() val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") tags.split(";").foreach(tag => { val jsonNode = GsonUtil.String2JsonArray(tag) for (i <- 0 until jsonNode.size) { val json = jsonNode.get(i).getAsJsonObject if (!json.has("package_name") || !json.has("date")) { jsonArray.add(json) } else if (map.keySet.contains(json.get("package_name").getAsString)) { if (map.get(json.get("package_name").getAsString)._1.before(sdf.parse(json.get("date").getAsString))) { map.put(json.get("package_name").getAsString, (sdf.parse(json.get("date").getAsString), json)) } } else { map.put(json.get("package_name").getAsString, (sdf.parse(json.get("date").getAsString), json)) } } }) import scala.collection.JavaConversions._ for (key <- map.keySet()) { jsonArray.add(map.get(key)._2) } jsonArray.toString } def toJsonArraySize(tags: String): Int = { GsonUtil.String2JsonArray(tags).size() } override protected def buildOptions(): Options = { val options = new Options options.addOption("date", true, "[must] date") options.addOption("ga_date", true, "[must] ga_date") options.addOption("output", true, "[must] output") options.addOption("coalesce", true, "[must] coalesce") options } } object DmInterestAll { def main(args: Array[String]): Unit = { new DmInterestAll().run(args) } }