package mobvista.dmp.datasource.dsp import java.net.URI import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.DateUtil import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ /** * @package: mobvista.dmp.datasource.dsp * @author: wangjf * @date: 2020/5/27 * @time: 11:26 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class DspDeviceProfile extends CommonSparkJob with Serializable { override protected def run(args: Array[String]) = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) printOptions(commandLine) } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession("DspDeviceProfile") try { FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) val last_date = DateUtil.getDay(mobvista.dmp.common.MobvistaConstant.sdf2.parse(date), "yyyyMMdd", -1) val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -365), "yyyy-MM-dd") val dt = mobvista.dmp.common.MobvistaConstant.sdf1.format(mobvista.dmp.common.MobvistaConstant.sdf2.parse(date)) val new_df = spark.sql(new_sql.replace("@date", dt)) .groupBy(col("device_id"), col("device_type")).agg( max(col("platform")).alias("platform"), max(col("maker")).alias("maker"), max(col("model")).alias("model"), max(col("os_version")).alias("os_version"), when(max(col("country_code")) === lit("GB"), lit("UK")) .otherwise(max(col("country_code"))).alias("country"), max(col("birthday")).alias("birthday"), max(col("gender")).alias("gender"), max(col("datetime").substr(0, 10)).alias("dmp_time") ) val old_df = spark.sql(old_sql.replace("@date", last_date).replace("@update_date", update_date)) new_df.join(old_df, new_df("device_id") === old_df("device_id") && new_df("device_type") === old_df("device_type"), "fullouter") .select( coalesce(new_df("device_id"), old_df("device_id")).alias("device_id"), coalesce(new_df("device_type"), old_df("device_type")).alias("device_type"), coalesce(new_df("platform"), old_df("platform")).alias("platform"), coalesce(new_df("maker"), old_df("maker")).alias("maker"), coalesce(new_df("model"), old_df("model")).alias("model"), coalesce(new_df("os_version"), old_df("os_version")).alias("os_version"), coalesce(new_df("country"), old_df("country")).alias("country"), coalesce(new_df("birthday"), old_df("birthday")).alias("birthday"), coalesce(new_df("gender"), old_df("gender")).alias("gender"), coalesce(new_df("dmp_time"), old_df("dmp_time")).alias("dmp_time") ).repartition(2000) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } override protected def buildOptions(): Options = { val options = new Options options.addOption("date", true, "[must] date") options.addOption("output", true, "[must] output") options } val old_sql = """ |SELECT * FROM dwh.dsp_profile_total | WHERE dt = '@date' AND dmp_time >= '@update_date' |""".stripMargin val new_sql = """ |SELECT * FROM dwh.etl_dsp_request_daily | WHERE `date` = '@date' |""".stripMargin } object DspDeviceProfile { def main(args: Array[String]): Unit = { new DspDeviceProfile().run(args) } }