package mobvista.dmp.datasource.behavior import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} class ThirdPartySourceDaily extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("outputtotal", true, "[must] outputtotal") options.addOption("coalesce", true, "[must] coalesce") options.addOption("today", true, "[must] today") options.addOption("yesbef3", true, "[must] yesbef3") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val outputtotal = commandLine.getOptionValue("outputtotal") val coalesce = commandLine.getOptionValue("coalesce") val today = commandLine.getOptionValue("today") val yesbef3 = commandLine.getOptionValue("yesbef3") val spark = SparkSession.builder() .appName("BehaviorThirdPartySourceDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true) try { val sql1 = s""" | select COALESCE(daily.device_id,total.device_id) device_id, | COALESCE(daily.country,total.country) country, | COALESCE(daily.device_type,total.device_type) device_type, | COALESCE(daily.platform,total.platform) platform, | COALESCE(daily.tag_name,total.tag_name) tag_name, | COALESCE(daily.tag_value,total.tag_value) tag_value, | COALESCE(daily.package_name,total.package_name) package_name, | COALESCE(daily.event_day,total.event_day) event_day | from (select device_id,max(country) country,device_type,case when device_type='idfa' then 'ios' else 'android' end as platform,max(behavior) tag_name,1 tag_value ,max(package_name) package_name ,max(behavior_time) event_day from dwh.etl_behavior_thirdparty_data_daily where dt = '${today}' | group by device_id,device_type ) daily | full join | (select device_id,max(country) country,device_type,case when device_type='idfa' then 'ios' else 'android' end as platform, max(tag_name) tag_name,max(tag_value) tag_value ,max(package_name) package_name ,max(event_day) event_day from dwh.etl_behavior_thirdparty_data_total where dt = '${yesbef3}' | group by device_id,device_type ) total | on (daily.device_id = total.device_id and daily.device_type = total.device_type) """.stripMargin spark.sql(sql1).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(outputtotal) } finally { spark.stop() } 0 } } object ThirdPartySourceDaily { def main(args: Array[String]): Unit = { new ThirdPartySourceDaily().run(args) } }