package mobvista.dmp.common import java.net.URI import com.alibaba.fastjson.JSONObject import mobvista.dmp.util.DateUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import scala.collection.mutable /** * @package: mobvista.dmp.common * @author: wangjf * @date: 2020/4/9 * @time: 2:39 下午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class InstallListMerge extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val spark = MobvistaConstant.createSparkSession(s"InstallListMerge.$date") try { val before_date = DateUtil.getDayByString(date, "yyyyMMdd", -1) val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -13), "yyyy-MM-dd") FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) spark.udf.register("merge", merge _) spark.udf.register("udf_mergeExtData", mobvista.dmp.datasource.dm.Constant.mergeExtData _) val df = spark.sql(merge_sql.replace("@date", date) .replace("@before_date", before_date) .replace("@update_date", update_date)) df.repartition(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "snappy") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } import scala.collection.JavaConverters._ def merge(installList: mutable.WrappedArray[String]): String = { val installJSONObject = new JSONObject installList.iterator.foreach(install => { val installMap = MobvistaConstant.String2JSONObject(install).asInstanceOf[java.util.Map[String, String]].asScala installMap.retain((k, v) => !installJSONObject.containsKey(k) || installJSONObject.getString(k).compareTo(v) < 0).foreach(kv => { installJSONObject.put(kv._1, kv._2) }) }) installJSONObject.toJSONString } /* val merge_sql: String = """ |SELECT COALESCE(d.device_id, a.device_id) device_id, | COALESCE(d.device_type, a.device_type) device_type, | COALESCE(d.platform, a.platform) platform, | COALESCE(d.country, a.country) country, | COALESCE(d.install_list, a.install_list) install_list, | COALESCE(d.ext_data, a.ext_data) ext_data, | COALESCE(d.update_date, a.update_date) update_date | FROM ( | SELECT LOWER(device_id) device_id, MAX(device_type) device_type, MAX(platform) platform, MAX(country) country, merge(COLLECT_SET(install_list)) install_list, | udf_mergeExtData(COLLECT_SET(CONCAT_WS('#', ext_data, update_date, business))) ext_data, MAX(update_date) update_date | FROM dwh.dmp_install_list WHERE dt = '@date' AND business = 'day' | GROUP BY LOWER(device_id) | ) d | FULL OUTER JOIN | (SELECT LOWER(device_id) device_id, MAX(device_type) device_type, MAX(platform) platform, MAX(country) country, merge(COLLECT_SET(install_list)) install_list, | udf_mergeExtData(COLLECT_SET(CONCAT_WS('#', ext_data, update_date, business))) ext_data, MAX(update_date) update_date | FROM dwh.dmp_install_list WHERE dt = '@before_date' AND business = '14days' AND update_date >= '@update_date' | GROUP BY LOWER(device_id) | ) a | ON d.device_id = a.device_id |""".stripMargin */ val merge_sql: String = """ |SELECT COALESCE(d.device_id, a.device_id) device_id, | COALESCE(d.device_type, a.device_type) device_type, | COALESCE(d.platform, a.platform) platform, | COALESCE(d.country, a.country) country, | COALESCE(d.install_list, a.install_list) install_list, | COALESCE(d.ext_data, a.ext_data) ext_data, | COALESCE(d.update_date, a.update_date) update_date | FROM ( | SELECT LOWER(device_id) device_id, MAX(device_type) device_type, MAX(platform) platform, MAX(country) country, merge(COLLECT_SET(install_list)) install_list, | MAX(ext_data) ext_data, MAX(update_date) update_date | FROM dwh.dmp_install_list | WHERE dt = '@date' AND business = 'day' | GROUP BY LOWER(device_id) | ) d | FULL OUTER JOIN | (SELECT * FROM dwh.dmp_install_list WHERE dt = '@before_date' AND business = '14days' AND update_date >= '@update_date') a | ON d.device_id = a.device_id |""".stripMargin } object InstallListMerge { def main(args: Array[String]): Unit = { new InstallListMerge().run(args) } }