package mobvista.dmp.datasource.iqiyi import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SparkSession class IQiYiLaHuoFourDaysDataDeduplication extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("dt_today", true, "[must] dt_today") options.addOption("dt_three_days_ago", true, "[must] dt_three_days_ago") options.addOption("output", true, "[must] output") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val dt_today = commandLine.getOptionValue("dt_today") val dt_three_days_ago = commandLine.getOptionValue("dt_three_days_ago") val output = commandLine.getOptionValue("output") val spark = SparkSession.builder() .appName("IQiYiLaHuoFourDaysDataDeduplication") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) try { val sql1= s""" |select XX.device_id,XX.device_type,XX.platform,XX.package_name,XX.update_date |FROM (select X.device_id,X.device_type,X.platform,X.package_name,X.update_date, |rank() over(partition by X.device_id order by X.update_date desc) rk |from ( select device_id,device_type,platform,package_name,update_date | from dwh.iqiyi_lahuo_tmp_daily_to_s3 | where business="iqiyi_activation" and dt<='${dt_today}' and dt>='${dt_three_days_ago}' | ) X ) XX | WHERE XX.rk= 1 """.stripMargin spark.sql(sql1).repartition(500).rdd.map(_.mkString("\t")).coalesce(100).saveAsTextFile(output) } finally { spark.stop() } 0 } } object IQiYiLaHuoFourDaysDataDeduplication { def main(args: Array[String]): Unit = { new IQiYiLaHuoFourDaysDataDeduplication().run(args) } }