ValidateDmpInterestV2.scala 2.28 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
package mobvista.dmp.datasource.dm

import com.alibaba.fastjson.JSONObject
import mobvista.dmp.common.CommonSparkJob
import org.apache.spark.sql.functions._

import scala.collection.mutable

/**
  * @package: mobvista.dmp.datasource.dm
  * @author: wangjf
  * @date: 2020/5/12
  * @time: 4:04 下午
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class ValidateDmpInterestV2 extends CommonSparkJob with Serializable {
  override protected def run(args: Array[String]): Int = {

    val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession("ValidateDmpInterestV2")

    spark.udf.register("getPkgSize", getPkgSize _)
    spark.udf.register("getInstallSize", getInstallSize _)
    val install_sql =
      """
        |SELECT getInstallSize(install_list) sizes FROM dwh.dmp_install_list WHERE dt = '20200709' AND update_date = '2020-07-09' AND business = '14days'
        |""".stripMargin

    val installDF = spark.sql(install_sql)

    println("installDF.UV ==>> " + installDF.count())
    val df1 = installDF.select(sum(col("sizes")).alias("siz")).first().getAs[Long]("siz")
    println("installDF.PKG ==>> " + df1)
    val interest_sql =
      """
        |SELECT getPkgSize(tags) sizes FROM dwh.dmp_interest_tag WHERE dt = '20200709' AND update_date = '2020-07-09'
        |""".stripMargin
    val interestDF = spark.sql(interest_sql)
    println("interestDF.UV ==>> " + interestDF.count())

    val df2 = interestDF.select(sum(col("sizes")).alias("siz")).first().getAs[Long]("siz")
    println("interestDF.PKG ==>> " + df2)
    if (spark != null) {
      spark.stop()
    }
    0
  }

  def getPkgSize(tags: String): Int = {
    val set: mutable.Set[String] = new mutable.HashSet[String]()
    import scala.collection.JavaConversions._
    mobvista.dmp.common.MobvistaConstant.String2JSONArray(tags).foreach(json => {
      if (json.asInstanceOf[JSONObject].keySet().contains("package_name")) {
        set.add(json.asInstanceOf[JSONObject].getString("package_name").toLowerCase)
      }
    })
    set.size
  }

  def getInstallSize(installList: String): Int = {
    val jsonObject = mobvista.dmp.common.MobvistaConstant.String2JSONObject(installList)
    jsonObject.size
  }
}

object ValidateDmpInterestV2 {
  def main(args: Array[String]): Unit = {
    new ValidateDmpInterestV2().run(args)
  }
}