AdnDataMidWay.scala 4.24 KB
package mobvista.dmp.datasource.adn_adx

/**
 * author andy.liu on 2019/9/24
 */


import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import java.net.URI


class AdnTecentAdxDataMidWay extends CommonSparkJob {


  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("outputadxtmp", true, "[must] outputadxtmp")
    options.addOption("dimadxpkg", true, "[must] dimadxpkg")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("today", true, "[must] today")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"
    val outputadxtmp = commandLine.getOptionValue("outputadxtmp")
    val dimadxpkg = commandLine.getOptionValue("dimadxpkg")
    val coalesce = commandLine.getOptionValue("coalesce")
    val today = commandLine.getOptionValue("today")

    val spark = SparkSession.builder()
      .appName("AdnTecentAdxDataMidWay")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputadxtmp), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(dimadxpkg), true)

    try {

      val year = today.substring(0, 4)
      val month = today.substring(4, 6)
      val day = today.substring(6, 8)

      val ods_adn_adx_req_tmp_pre =
        s"""
          select request_id,device_id,'idfa' device_type,os platform,device_model,osv os_version,ct country,adx_dsp.sbid[0].bid[0].price_raw price_raw,cast(`date` as string) `date`  from (
           |select channel_request_id request_id,
           | device_info.ifa device_id,
           | device_info.md device_model,
           | device_info.os os,
           | device_info.osv osv,
           | device_info.geo.ct ct,
           | explode (dsp_response) adx_dsp,
           | `date`
           |from  dwh.ods_adn_adx_v1_request
           |where yyyy = '$year' and mm = '$month' and dd = '$day' and  device_info.ifa rlike  '${didPtn}') t
           |where t.adx_dsp.id in (4,27)  and t.os ='ios' and  adx_dsp.sbid[0].bid[0].price_raw > 100
      """.stripMargin

      spark.sql(ods_adn_adx_req_tmp_pre)
        .repartition(coalesce.toInt)
        .persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("ods_adn_adx_req_tmp_pre")

      val ods_adn_adx_req_tmp =
        """
          |select * from ods_adn_adx_req_tmp_pre
        """.stripMargin

      spark.sql(ods_adn_adx_req_tmp)
        .write.mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputadxtmp)

      // 数据量很少,直接写表操作  hive.exec.reducers.bytes.per.reducer
      val dim_adn_adx_package =
        s"""
           |insert overwrite table dwh.dim_adn_adx_package partition(dt='${today}')
           |select substr(t1.package_name,3)  package_name
           |from (select request_id,package package_name,platform,backend_id
           |from adn_report.adndata_midway_backend_v2 where yyyy = '$year' and mm = '$month' and dd = '$day' and  platform ='2' and backend_id = '17' ) t1
           |join ods_adn_adx_req_tmp_pre t2
           |on(t1.request_id = t2.request_id)
           |where t1.package_name  like 'id%'
           |group by t1.package_name
      """.stripMargin

      spark.sql(dim_adn_adx_package)
      //.coalesce(coalesce.toInt)
      //.rdd
      // .saveAsTextFile("")


    } finally {
      spark.stop()
    }
    0
  }
}

object AdnTecentAdxDataMidWay {
  def main(args: Array[String]): Unit = {
    new AdnTecentAdxDataMidWay().run(args)
  }
}