InstallListDaily.scala 6.12 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
package mobvista.dmp.common

import java.net.URI

import com.alibaba.fastjson.JSONObject
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.mutable

/**
  * @package: mobvista.dmp.common
  * @author: wangjf
  * @date: 2020/4/9
  * @time: 2:39 下午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class InstallListDaily extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val spark = SparkSession
      .builder()
      .appName(s"InstallListDaily.$date")
      .config("spark.rdd.compress", "true")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    try {
      val before_date = DateUtil.getDayByString(date, "yyyyMMdd", -1)

      val before_update_date = DateUtil.format(DateUtil.parse(before_date, "yyyyMMdd"), "yyyy-MM-dd")

      val before_2_date = DateUtil.getDayByString(date, "yyyyMMdd", -2)

      val before_2_update_date = DateUtil.format(DateUtil.parse(before_2_date, "yyyyMMdd"), "yyyy-MM-dd")

      val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")

      //  val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -14), "yyyy-MM-dd")

      FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

      spark.udf.register("merge", merge _)
      spark.udf.register("udf_mergeExtData", mobvista.dmp.datasource.dm.Constant.mergeExtData _)
      spark.udf.register("getMd5", MobvistaConstant.getMd5 _)

      val activeDev = spark.sql(active_sql.replace("@date", date)
        .replace("@update_date", update_date)
        .replace("@before_date", before_date)
        .replace("@before_update_date", before_update_date)
        .replace("@before_2_date", before_2_date)
        .replace("@before_2_update_date", before_2_update_date)
      )

      activeDev.createOrReplaceTempView("active_dev")

      sql = sql.replace("@date", date)
        .replace("@before_date", before_date)
        .replace("@before_2_date", before_2_date)

      val df = spark.sql(sql)
      df.repartition(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }

  /**
    * 先筛选近1天的活跃设备,再进行 LEFT SEMI JOIN 获取近1天设备的安装列表数据
    */
  val active_sql: String =
    """
      |SELECT getMd5(device_id) device_id
      | FROM
      |   (SELECT device_id
      |     FROM dwh.dmp_install_list
      |     WHERE dt = '@date' AND business NOT IN ('ga','other','14days','day','ali_acquisition','ali_activation') AND update_date = '@update_date'
      |   UNION
      |   SELECT device_id
      |     FROM dwh.dmp_install_list
      |     WHERE dt = '@before_date' AND business IN ('ga','other') AND update_date = '@before_update_date'
      |   UNION
      |   SELECT device_id
      |     FROM dwh.dmp_install_list
      |     WHERE dt = '@before_2_date' AND business IN ('ali_acquisition','ali_activation') AND update_date = '@before_2_update_date'
      |   ) all
      |GROUP BY getMd5(device_id)
      |""".stripMargin

  /**
    * 以全量数据为准进行合并,计算近14天的活跃设备信息
    */
  var sql: String =
    """
      |SELECT device_id, MAX(device_type) device_type, MAX(platform) platform, MAX(country) country, merge(COLLECT_SET(install_list)) install_list,
      | udf_mergeExtData(COLLECT_SET(CONCAT_WS('#', ext_data, update_date, business))) ext_data, MAX(update_date) update_date
      | FROM
      |   (SELECT getMd5(device_id) device_id,device_type,platform,country,install_list,ext_data,update_date,business
      |     FROM dwh.dmp_install_list d LEFT SEMI JOIN active_dev a ON getMd5(d.device_id) = a.device_id
      |     WHERE dt = '@date' AND business NOT IN ('ga','other','14days','day','ali_acquisition','ali_activation')
      |   UNION
      |   SELECT getMd5(device_id) device_id,device_type,platform,country,install_list,ext_data,update_date,business
      |     FROM dwh.dmp_install_list d LEFT SEMI JOIN active_dev a ON getMd5(d.device_id) = a.device_id
      |     WHERE dt = '@before_date' AND business IN ('ga','other')
      |   UNION
      |   SELECT getMd5(device_id) device_id,device_type,platform,country,install_list,ext_data,update_date,business
      |     FROM dwh.dmp_install_list d LEFT SEMI JOIN active_dev a ON getMd5(d.device_id) = a.device_id
      |     WHERE dt = '@before_2_date' AND business IN ('ali_acquisition','ali_activation')
      |   ) all
      |GROUP BY device_id
      |""".stripMargin

  import scala.collection.JavaConverters._

  def merge(installList: mutable.WrappedArray[String]): String = {
    val installJSONObject = new JSONObject
    installList.iterator.foreach(install => {
      val installMap = MobvistaConstant.String2JSONObject(install).asInstanceOf[java.util.Map[String, String]].asScala
      installMap.retain((k, v) => !installJSONObject.containsKey(k) || installJSONObject.getString(k).compareTo(v) < 0).foreach(kv => {
        installJSONObject.put(kv._1, kv._2)
      })
    })
    installJSONObject.toJSONString
  }
}

object InstallListDaily {

  def main(args: Array[String]): Unit = {
    new InstallListDaily().run(args)
  }
}