package mobvista.dmp.datasource.dm ; import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.SparkSession; class DmInterestDaily extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) printOptions(commandLine) return 1 } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val yestoday = commandLine.getOptionValue("yestoday") val lastReqDay = commandLine.getOptionValue("lastReqDay") val lastReqDay1 = commandLine.getOptionValue("lastReqDay1") val output = commandLine.getOptionValue("output") val spark = SparkSession.builder() .appName("DmInterestDaily") .config("spark.rdd.compress", "true") .config("spark.speculation", "true") .config("spark.speculation.quantile", "0.9") .config("spark.speculation.multiplier", "1") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { val sql = s""" |select a.device_id, a.device_type, b.platform, b.tags |from ( | select t.device_id, t.device_type | from ( | select t.device_id, t.device_type | from dwh.ods_dmp_user_info t | where t.dt='${date}' and t.business <> 'ga' and t.last_req_day='${lastReqDay}' | union all | select t.device_id, t.device_type | from dwh.ods_dmp_user_info t | where t.dt='${yestoday}' and t.business = 'ga' and t.last_req_day='${lastReqDay1}' | ) t | group by t.device_id, t.device_type |) a |join ( | select * | from dwh.dm_interest_tag t | where concat(t.year, t.month, t.day)='20180608' and t.business='all' |) b on a.device_id=b.device_id and a.device_type=b.device_type """.stripMargin FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) spark.sql(sql) .rdd .map(_.mkString(DATA_SPLIT)) .saveAsTextFile(output, classOf[GzipCodec]) } finally { if (spark != null) { spark.stop() } } 0 } override protected def buildOptions(): Options = { val options = new Options options.addOption("date", true, "[must] date ") options.addOption("output", true, "[must] output path") options.addOption("yestoday", true, "[must] yestoday ") options.addOption("lastReqDay", true, "[must] lastReqDay ") options.addOption("lastReqDay1", true, "[must] lastReqDay1 ") options } } object DmInterestDaily { def main(args: Array[String]): Unit = { new DmInterestDaily().run(args) } }