package mobvista.dmp.datasource.dm import com.alibaba.fastjson.JSON import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.DateUtil import org.apache.commons.cli.Options import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.functions._ /** * @package: mobvista.dmp.datasource.dm * @author: wangjf * @date: 2020/5/25 * @time: 6:30 下午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class ValidateDmpInstallList extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) printOptions(commandLine) return 1 } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession("ValidateDmpInstallList") spark.udf.register("get_pkg_size", get_pkg_size _) spark.udf.register("filter", filter _) val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", 0), "yyyy-MM-dd") val sql = common_sql.replace("@dt", date).replace("@update_date", update_date).replace("@last_day", update_date) spark.sql(sql.replace("@system", "m")).select( count(lit(1)).alias("m_uv"), sum(col("pkg_size")).alias("m_pkg_size") ).foreach(r => { println("m_uv -->> " + r.getAs[Int]("m_uv") + ",m_pkg_size -->> " + r.getAs[Int]("m_pkg_size")) }) spark.sql(sql.replace("@system", "dsp")).select( count(lit(1)).alias("dsp_uv"), sum(col("pkg_size")).alias("dsp_pkg_size") ).foreach(r => { println("dsp_uv -->> " + r.getAs[Int]("dsp_uv") + ",dsp_pkg_size -->> " + r.getAs[Int]("dsp_pkg_size")) }) 0 } def get_pkg_size(install_list: String) = { import scala.collection.JavaConverters._ val installMap = JSON.parse(install_list).asInstanceOf[java.util.Map[String, String]].asScala val pkg_size = installMap.retain((k, _) => !k.equals("0000000000") && !k.equals("com.nonepkg.com.nonepkg")).toMap.asJava.size() pkg_size } def filter(ext_data: String, system: String, last_day: String) = { var flag = false val ext_json = mobvista.dmp.common.MobvistaConstant.String2JSONObject(ext_data) if (system.equals("m") && ext_json.containsKey("m")) { val m_json = ext_json.getJSONObject("m") val last_date = m_json.getString("last_date") var isMstrategy = false if (m_json.containsKey("strategy")) { val strategyList = m_json.getJSONArray("strategy").toArray for (strategy <- strategyList if strategy.toString.contains("MNormalAlphaModelRanker")) { isMstrategy = true } } val dev_tag = m_json.getString("dev_tag") flag = last_date.equals(last_day) && StringUtils.isNotBlank(dev_tag) && dev_tag.equals("1") && isMstrategy } if (system.equals("dsp") && ext_json.containsKey("dsp")) { val dsp_json = ext_json.getJSONObject("dsp") val last_date = dsp_json.getString("last_date") flag = last_date.equals(last_day) } flag } val common_sql = """ |SELECT device_id,get_pkg_size(install_list) pkg_size FROM dwh.dmp_install_list | WHERE dt = '@dt' AND update_date = '@update_date' AND business = '14days' AND filter(ext_data,'@system','@last_day') |""".stripMargin override protected def buildOptions(): Options = { val options = new Options options.addOption("date", true, "[must] date") options } } object ValidateDmpInstallList { def main(args: Array[String]): Unit = { new ValidateDmpInstallList().run(args) } }