package mobvista.dmp.datasource.dm import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.format.TextMultipleOutputFormat import org.apache.commons.cli.Options import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.functions.{concat_ws, lit} import org.apache.spark.storage.StorageLevel import java.net.URI import scala.collection.mutable.ArrayBuffer /** * @author jiangfan * @date 2021/8/19 14:17 */ class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output1", true, "[must] output1") options.addOption("output2", true, "[must] output2") options.addOption("dt_today", true, "[must] dt_today") options.addOption("update", true, "[must] update") options.addOption("update02", true, "[must] update02") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output1 = commandLine.getOptionValue("output1") val output2 = commandLine.getOptionValue("output2") val dt_today = commandLine.getOptionValue("dt_today") val update = commandLine.getOptionValue("update") val update02 = commandLine.getOptionValue("update02") val spark = SparkSession.builder() .appName("ComEgAndroidAlipayGphoneReyun") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output2), true) try { val conf = spark.sparkContext.hadoopConfiguration conf.set("mapreduce.output.compress", "true") conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.setBoolean("mapreduce.output.fileoutputformat.compress", true) conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec]) val sql1= s""" |select | device_id, device_type ,concat("[\\"",package_name,"_",business,"\\"]") as package_name |from | dwh.dm_install_list_v2 |where dt='${dt_today}' and business='reyun' and package_name='com.eg.android.AlipayGphone' | and device_type in ('imei','gaid','oaid','idfa','imeimd5','gaidmd5','oaidmd5','idfamd5') | and update_date >= "${update}" |union |select | device_id, device_type ,concat("[\\"",package_name,"_",business,"\\"]") as package_name |from | dwh.dm_install_list_v2 |where dt='${dt_today}' and business in ('reyun','btop') | and package_name in ('com.taobao.litetao','com.ss.android.ugc.aweme') | and device_type in ('imei','gaid','oaid','idfa','imeimd5','gaidmd5','oaidmd5','idfamd5') | and update_date >= "${update}" |union |select | device_id, device_type ,concat("[\\"",package_name,"_3","\\"]") as package_name |from | dwh.dm_install_list_v2 |where dt='${dt_today}' and business in ('dsp_req') | and package_name in ('com.taobao.taobao_iqiyi') | and update_date >= "${update02}" """.stripMargin val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER) val data01 = df01.select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),df01.col("package_name"))) val data01_with_country = df01.select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),lit("CN"))) data01.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output1) data01_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output2) } finally { spark.stop() } 0 } } object ComEgAndroidAlipayGphoneReyun { def main(args: Array[String]): Unit = { new ComEgAndroidAlipayGphoneReyun().run(args) } }