Commit 92af9928 by WangJinfeng

support reyun

parent 4605b740
...@@ -79,7 +79,7 @@ where year='$old_year' ...@@ -79,7 +79,7 @@ where year='$old_year'
and update_time<='$SEVEN_DAYS_AGO' and update_time<='$SEVEN_DAYS_AGO'
and update_time>='$FOURTEEN_DAYS_AGO' and update_time>='$FOURTEEN_DAYS_AGO'
) t ) t
GROUP BY t.package_name, t.platform LIMIT 200000 GROUP BY t.package_name, t.platform LIMIT 100000
" | grep -v '^[0-9]\{5,7\}\s\+android' > to_crawler_package_name.txt " | grep -v '^[0-9]\{5,7\}\s\+android' > to_crawler_package_name.txt
if [ $? -ne 0 ];then if [ $? -ne 0 ];then
......
type=command type=command
dependencies=tracking_impression,tracking_click,tracking_install,event_daily,user_info dependencies=dim_app_tag_daily,dim_package_tag_daily,tracking_impression,tracking_click,tracking_install,event_daily,user_info
command=echo 'export reyun success!' command=echo 'export reyun success!'
\ No newline at end of file
...@@ -64,12 +64,11 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable { ...@@ -64,12 +64,11 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable {
activeDev.createOrReplaceTempView("active_dev") activeDev.createOrReplaceTempView("active_dev")
spark.udf.register("merge", merge _) spark.udf.register("merge", merge _)
spark.udf.register("filterInstall", filterInstall _) // spark.udf.register("filterInstall", filterInstall _)
spark.udf.register("udf_mergeExtData", mobvista.dmp.datasource.dm.Constant.mergeExtData _) spark.udf.register("udf_mergeExtData", mobvista.dmp.datasource.dm.Constant.mergeExtData _)
sql = sql.replace("@date", date).replace("@before_date", before_date) sql = sql.replace("@date", date).replace("@before_date", before_date)
val df = spark.sql(sql).filter( val df = spark.sql(sql)
"filterInstall(install_list)"
)
df.repartition(coalesce.toInt) df.repartition(coalesce.toInt)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
...@@ -127,8 +126,8 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable { ...@@ -127,8 +126,8 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable {
val installJSONObject = new JSONObject val installJSONObject = new JSONObject
installList.iterator.foreach(install => { installList.iterator.foreach(install => {
val installMap = MobvistaConstant.String2JSONObject(install).asInstanceOf[java.util.Map[String, String]].asScala val installMap = MobvistaConstant.String2JSONObject(install).asInstanceOf[java.util.Map[String, String]].asScala
installMap.retain((k, v) => (!installJSONObject.containsKey(k) || installJSONObject.getString(k).compareTo(v) < 0) installMap.retain((k, v) => !installJSONObject.containsKey(k) || installJSONObject.getString(k).compareTo(v) < 0).foreach(kv => {
&& !k.equalsIgnoreCase("0000000000") && !k.equalsIgnoreCase("com.nonepkg.nonepkg")).foreach(kv => { // && !k.equalsIgnoreCase("0000000000") && !k.equalsIgnoreCase("com.nonepkg.nonepkg")
installJSONObject.put(kv._1, kv._2) installJSONObject.put(kv._1, kv._2)
}) })
}) })
...@@ -136,7 +135,7 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable { ...@@ -136,7 +135,7 @@ class InstallListDailyV2 extends CommonSparkJob with Serializable {
} }
def filterInstall(installList: String): Boolean = { def filterInstall(installList: String): Boolean = {
MobvistaConstant.String2JSONObject(installList).size() > 0 !MobvistaConstant.String2JSONObject(installList).isEmpty
} }
} }
......
package mobvista.dmp.output.reyun
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.Options
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.output.reyun
* @author: wangjf
* @date: 2021/9/14
* @time: 2:06 下午
* @email: jinfeng.wang@mobvista.com
*/
class AppTagDaily extends CommonSparkJob {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("date", true, "[must] date")
options.addOption("output", true, "[must] output")
options.addOption("coalesce", true, "[must] coalesce")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else {
printOptions(commandLine)
}
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce"))
val spark = MobvistaConstant.createSparkSession(s"AppTagDaily.${date}")
try {
val sql =
s"""
|SELECT * FROM dwh.dim_app_tag WHERE CONCAT(year,month,day) = '${date}'
|""".stripMargin
val df = spark.sql(sql)
df.repartition(coalesce)
.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally {
spark.stop()
}
0
}
}
object AppTagDaily {
def main(args: Array[String]): Unit = {
new AppTagDaily().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment