package mobvista.dmp.datasource.dm import com.alibaba.fastjson.JSONObject import mobvista.dmp.common.CommonSparkJob import org.apache.spark.sql.functions._ import scala.collection.mutable /** * @package: mobvista.dmp.datasource.dm * @author: wangjf * @date: 2020/5/12 * @time: 4:04 下午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class ValidateDmpInterestV2 extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession("ValidateDmpInterestV2") spark.udf.register("getPkgSize", getPkgSize _) spark.udf.register("getInstallSize", getInstallSize _) val install_sql = """ |SELECT getInstallSize(install_list) sizes FROM dwh.dmp_install_list WHERE dt = '20200709' AND update_date = '2020-07-09' AND business = '14days' |""".stripMargin val installDF = spark.sql(install_sql) println("installDF.UV ==>> " + installDF.count()) val df1 = installDF.select(sum(col("sizes")).alias("siz")).first().getAs[Long]("siz") println("installDF.PKG ==>> " + df1) val interest_sql = """ |SELECT getPkgSize(tags) sizes FROM dwh.dmp_interest_tag WHERE dt = '20200709' AND update_date = '2020-07-09' |""".stripMargin val interestDF = spark.sql(interest_sql) println("interestDF.UV ==>> " + interestDF.count()) val df2 = interestDF.select(sum(col("sizes")).alias("siz")).first().getAs[Long]("siz") println("interestDF.PKG ==>> " + df2) if (spark != null) { spark.stop() } 0 } def getPkgSize(tags: String): Int = { val set: mutable.Set[String] = new mutable.HashSet[String]() import scala.collection.JavaConversions._ mobvista.dmp.common.MobvistaConstant.String2JSONArray(tags).foreach(json => { if (json.asInstanceOf[JSONObject].keySet().contains("package_name")) { set.add(json.asInstanceOf[JSONObject].getString("package_name").toLowerCase) } }) set.size } def getInstallSize(installList: String): Int = { val jsonObject = mobvista.dmp.common.MobvistaConstant.String2JSONObject(installList) jsonObject.size } } object ValidateDmpInterestV2 { def main(args: Array[String]): Unit = { new ValidateDmpInterestV2().run(args) } }