package mobvista.dmp.datasource.taobao import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, SparkSession} import java.net.URI import scala.collection.mutable.ArrayBuffer /** * @author jiangfan * @date 2021/4/23 10:42 */ class EtlComTencentNewsDaily extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("dt_today", true, "[must] dt_today") options.addOption("dt_dash_rec7day", true, "[must] dt_dash_rec7day") options.addOption("dt_dash_rec15day", true, "[must] dt_dash_rec15day") options.addOption("output", true, "[must] output") options.addOption("coalesce", true, "[must] coalesce") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val dt_today = commandLine.getOptionValue("dt_today") val dt_dash_rec7day = commandLine.getOptionValue("dt_dash_rec7day") val dt_dash_rec15day = commandLine.getOptionValue("dt_dash_rec15day") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val spark = SparkSession.builder() .appName("EtlComTencentNewsDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) val sc = spark.sparkContext try { val sql1 = s""" |select device_id,device_type,platform,'com.tencent.news_bes_7' package_name from dwh.dm_install_list_v2 |where dt='${dt_today}' and business='dsp_req' and update_date>='${dt_dash_rec7day}' |and package_name in ('com.tencent.news_bes') |union |select device_id,device_type,platform,'com.tencent.news_bes_15' package_name from dwh.dm_install_list_v2 |where dt='${dt_today}' and business='dsp_req' and update_date>='${dt_dash_rec15day}' |and package_name in ('com.tencent.news_bes') |union |select device_id,device_type,platform,'com.tencent.news_oppo_7' package_name from dwh.dm_install_list_v2 |where dt='${dt_today}' and business='dsp_req' and update_date>='${dt_dash_rec7day}' |and package_name in ('com.tencent.news_oppo') |union |select device_id,device_type,platform,'com.tencent.news_oppo_15' package_name from dwh.dm_install_list_v2 |where dt='${dt_today}' and business='dsp_req' and update_date>='${dt_dash_rec15day}' |and package_name in ('com.tencent.news_oppo') """.stripMargin val sql = s""" |SELECT device_id, device_type, platform, package_name, update_date | FROM dwh.dm_install_list_v2 | WHERE dt = '${dt_today}' AND business = 'dsp_req' AND update_date >= '${dt_dash_rec15day}' | AND package_name IN ('com.tencent.news_bes','com.tencent.news_oppo') |""".stripMargin def schema: StructType = { StructType(StructField("device_id", StringType) :: StructField("device_type", StringType) :: StructField("platform", StringType) :: StructField("package_name", StringType) :: Nil) } val rdd = spark.sql(sql).rdd.map(row => { val array = new ArrayBuffer[Row]() val device_id = row.getAs[String]("device_id") val device_type = row.getAs[String]("device_type") val platform = row.getAs[String]("platform") val package_name = row.getAs[String]("package_name") val update_date = row.getAs[String]("update_date") array += Row(device_id, device_type, platform, package_name + "_15") if (update_date.compareTo(dt_dash_rec7day) >= 0) { array += Row(device_id, device_type, platform, package_name + "_7") } array }).flatMap(l => l) spark.createDataFrame(rdd, schema) .repartition(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", value = false) .orc(output) } finally { spark.stop() } 0 } } object EtlComTencentNewsDaily { def main(args: Array[String]): Unit = { new EtlComTencentNewsDaily().run(args) } }