package mobvista.dmp.datasource.dm import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.{concat_ws, lit} import org.apache.spark.storage.StorageLevel import java.net.URI /** * @author jiangfan * @date 2021/11/4 14:50 */ class Com3appXianjindai extends CommonSparkJob with Serializable{ override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output01", true, "[must] output01") options.addOption("output02", true, "[must] output02") options.addOption("to_day", true, "[must] to_day") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output01 = commandLine.getOptionValue("output01") val output02 = commandLine.getOptionValue("output02") val to_day = commandLine.getOptionValue("to_day") val spark = SparkSession.builder() .appName("Com3appXianjindai") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val conf = spark.sparkContext.hadoopConfiguration conf.set("mapreduce.output.compress", "true") conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.setBoolean("mapreduce.output.fileoutputformat.compress", true) conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec]) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) try { //需求原本是从adn_dsp.log_adn_dsp_request_orc_hour取数据,但这个表计算量大,发现dwh.etl_dsp_request_daily_hours 保存了adn_dsp.log_adn_dsp_request_orc_hour相关数据,读取该表,减小计算量 val sql01= s""" |select |device_id |from |( |select |device_id, count(distinct t1.appid) as app_num |from |( |select appid, device_id |from | (select | appid,device_id | from | (select | packagename,split(exitid,',')[0] as device_id | FROM | dwh.etl_dsp_request_daily_hours | where dt= '${to_day}' | and country = 'MX' and platform = 'android' | ) tmpdata lateral view explode(split(packagename, "#")) num as appid | ) t01 |where device_id not in ('0','','-','00000000-0000-0000-0000-000000000000') and device_id rlike '${didPtn}' |group by appid,device_id |) t1 |group by device_id )t2 |where app_num>=3 |group by device_id """.stripMargin println("sql01=="+sql01) val gaid_df: DataFrame = spark.sql(sql01).persist(StorageLevel.MEMORY_AND_DISK_SER) val gaid_df_with_package_name = gaid_df.select(concat_ws("\t", gaid_df.col("device_id"), lit("gaid"), lit("android"),lit("[\"com.3app.xianjindai\"]"))) val gaid_df_with_country = gaid_df.select(concat_ws("\t", gaid_df.col("device_id"), lit("gaid"), lit("android"),lit("MX"))) gaid_df_with_package_name.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) gaid_df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) } finally { spark.stop() } 0 } } object Com3appXianjindai { def main(args: Array[String]): Unit = { new Com3appXianjindai().run(args) } }