package mobvista.dmp.common import com.alibaba.fastjson.JSONObject import mobvista.dmp.util.DateUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import java.net.URI import scala.collection.mutable /** * @package: mobvista.dmp.common * @author: wangjf * @date: 2020/4/9 * @time: 2:39 下午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class InstallListDailyV2 extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val spark = SparkSession .builder() .appName(s"InstallListDailyV2.$date") .config("spark.rdd.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { val before_date = DateUtil.getDayByString(date, "yyyyMMdd", -1) // val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") // val before_update_date = DateUtil.format(DateUtil.parse(before_date, "yyyyMMdd"), "yyyy-MM-dd") val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -14), "yyyy-MM-dd") FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) val activeDev = spark.sql(active_sql.replace("@date", date) .replace("@update_date", update_date) .replace("@before_date", before_date)) activeDev.createOrReplaceTempView("active_dev") spark.udf.register("merge", merge _) // spark.udf.register("filterInstall", filterInstall _) spark.udf.register("udf_mergeExtData", mobvista.dmp.datasource.dm.Constant.mergeExtData _) sql = sql.replace("@date", date).replace("@before_date", before_date) val df = spark.sql(sql) df.repartition(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "snappy") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } /** * 先筛选近1天的活跃设备,再进行 LEFT SEMI JOIN 获取近1天设备的安装列表数据 */ val active_sql: String = """ |SELECT device_id | FROM | (SELECT LOWER(device_id) device_id | FROM dwh.dmp_install_list | WHERE dt = '@date' AND business NOT IN ('ga','other','14days','day') AND update_date >= '@update_date' | UNION | SELECT LOWER(device_id) device_id | FROM dwh.dmp_install_list | WHERE dt = '@before_date' AND business IN ('ga','other') AND update_date >= '@update_date' | ) all |GROUP BY device_id |""".stripMargin /** * 以全量数据为准进行合并,计算近14天的活跃设备信息 */ var sql: String = """ |SELECT device_id, MAX(device_type) device_type, MAX(platform) platform, MAX(country) country, merge(COLLECT_SET(install_list)) install_list, | udf_mergeExtData(COLLECT_SET(CONCAT_WS('#', ext_data, update_date, business))) ext_data, CONCAT_WS(',',COLLECT_SET(business)) merge_bus, MAX(update_date) update_date | FROM | (SELECT LOWER(device_id) device_id,device_type,platform,country,install_list,ext_data,update_date,business | FROM dwh.dmp_install_list d LEFT SEMI JOIN active_dev a ON LOWER(d.device_id) = a.device_id | WHERE dt = '@date' AND business NOT IN ('ga','other','14days','day') | UNION | SELECT LOWER(device_id) device_id,device_type,platform,country,install_list,ext_data,update_date,business | FROM dwh.dmp_install_list d LEFT SEMI JOIN active_dev a ON LOWER(d.device_id) = a.device_id | WHERE dt = '@before_date' AND business IN ('ga','other') | ) all |GROUP BY device_id |""".stripMargin import scala.collection.JavaConverters._ def merge(installList: mutable.WrappedArray[String]): String = { val installJSONObject = new JSONObject installList.iterator.foreach(install => { val installMap = MobvistaConstant.String2JSONObject(install).asInstanceOf[java.util.Map[String, String]].asScala installMap.retain((k, v) => !installJSONObject.containsKey(k) || installJSONObject.getString(k).compareTo(v) < 0).foreach(kv => { // && !k.equalsIgnoreCase("0000000000") && !k.equalsIgnoreCase("com.nonepkg.nonepkg") installJSONObject.put(kv._1, kv._2) }) }) installJSONObject.toJSONString } def filterInstall(installList: String): Boolean = { !MobvistaConstant.String2JSONObject(installList).isEmpty } } object InstallListDailyV2 { def main(args: Array[String]): Unit = { new InstallListDailyV2().run(args) } }