package mobvista.dmp.datasource.reyun import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.format.RDDMultipleOutputFormat import mobvista.dmp.util.DateUtil import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} import org.apache.spark.sql.SparkSession import java.net.URI /** * @author jiangfan * @date 2021/12/16 17:51 */ class ReyunPseudoPackageToS3 extends CommonSparkJob with Serializable{ override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output", true, "[must] output") options.addOption("dt_today", true, "[must] dt_today") options.addOption("update_date", true, "[must] update_date") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output = commandLine.getOptionValue("output") val dt_today = commandLine.getOptionValue("dt_today") val update_date = commandLine.getOptionValue("update_date") val spark = SparkSession.builder() .appName("ReyunPseudoPackageToS3") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ val conf = spark.sparkContext.hadoopConfiguration conf.set("mapreduce.output.compress", "true") conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.setBoolean("mapreduce.output.fileoutputformat.compress", true) conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec]) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) try { val sql1= s""" |select device_type,device_id,package_name |from dwh.dm_install_list_v2 where dt = '${dt_today}' and business ='reyun' |and package_name in ("com.sankuai.meituan_imei_reyun","com.sankuai.meituan_oaid_reyun","com.taobao.taobao_imei_reyun","com.taobao.taobao_oaid_reyun","com.eg.android.AlipayGphone_oaid_reyun","com.eg.android.AlipayGphone_imei_reyun") |and update_date ='${update_date}' |and device_type in ('oaid','imei') |group by device_type,device_id,package_name """.stripMargin println("sql=============="+sql1) spark.sql(sql1).rdd.map(r => { val device_id = r.getAs[String]("device_id") val device_type = r.getAs[String]("device_type") val package_name = r.getAs[String]("package_name") (device_id, device_type, package_name) }) .coalesce(coalesce.toInt).map(t => { (new Text(s"${output}/${t._3}/${t._2}"), new Text(t._1)) }).saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf) } finally { spark.stop() } 0 } } object ReyunPseudoPackageToS3 { def main(args: Array[String]): Unit = { new ReyunPseudoPackageToS3().run(args) } }