Commit ae9bcd9d by WangJinfeng

fix craw package tag bug

parent 911afe9a
package mobvista.dmp.datasource.apptag package mobvista.dmp.datasource.apptag
import java.net.URI
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import java.net.URI
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
...@@ -74,14 +73,14 @@ class CrawPkgsSpark extends CommonSparkJob with Serializable { ...@@ -74,14 +73,14 @@ class CrawPkgsSpark extends CommonSparkJob with Serializable {
s""" s"""
|SELECT b.package_name, getPlatform(b.package_name) platform |SELECT b.package_name, getPlatform(b.package_name) platform
| FROM | FROM
| (SELECT package_name | (SELECT package_name, COUNT(1) counts
| FROM dwh.dmp_install_list LATERAL VIEW explode(getPkgs(install_list)) dmp_table AS package_name WHERE dt = '$yesday' AND business = 'day' | FROM dwh.dmp_install_list LATERAL VIEW explode(getPkgs(install_list)) dmp_table AS package_name WHERE dt = '$yesday' AND business = '14days'
| GROUP BY package_name | GROUP BY package_name
| ) b | ) b
| LEFT JOIN | LEFT JOIN
| (SELECT package_name FROM dev.dm_package_black_list WHERE dt = '$yesday') a | (SELECT package_name FROM dev.dm_package_black_list WHERE dt = '$yesday') a
| ON a.package_name = b.package_name | ON LOWER(a.package_name) = LOWER(b.package_name)
| WHERE a.package_name IS NULL | WHERE a.package_name IS NULL ORDER BY counts DESC LIMIT 1000000
|""".stripMargin |""".stripMargin
spark.sql(sql).rdd spark.sql(sql).rdd
......
package mobvista.dmp.main package mobvista.dmp.main
import java.net.URI
import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI
/** /**
* 将抓包程序抓不到的包,加入到黑名单中 * 将抓包程序抓不到的包,加入到黑名单中
*/ */
class PackageBlackList extends CommonSparkJob with Serializable { class PackageBlackList extends CommonSparkJob with Serializable {
override protected def run(args: Array[String]): Int = { override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args) val commandLine = commParser.parse(options, args)
...@@ -47,8 +47,16 @@ class PackageBlackList extends CommonSparkJob with Serializable { ...@@ -47,8 +47,16 @@ class PackageBlackList extends CommonSparkJob with Serializable {
val adrDailyRDD = sc.textFile(adrDailyPath) val adrDailyRDD = sc.textFile(adrDailyPath)
.map(splitFun(_)(0)) .map(splitFun(_)(0))
iosDailyRDD.union(adrDailyRDD) val packageRDD = spark.sql(
.map(PackageVO(_)) """
|SELECT LOWER(package_name) package_name FROM dwh.dim_package_tag GROUP BY LOWER(package_name)
|""".stripMargin)
.rdd.map(r => {
r.getAs[String]("package_name")
})
iosDailyRDD.union(adrDailyRDD).union(packageRDD)
.map(PackageVO)
.toDF() .toDF()
.createOrReplaceTempView("t_package_daily") .createOrReplaceTempView("t_package_daily")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment