package mobvista.dmp.datasource.adn_adx /** * author andy.liu on 2019/9/24 */ import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel import java.net.URI class AdnTecentAdxDataMidWay extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("outputadxtmp", true, "[must] outputadxtmp") options.addOption("dimadxpkg", true, "[must] dimadxpkg") options.addOption("coalesce", true, "[must] coalesce") options.addOption("today", true, "[must] today") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$" val outputadxtmp = commandLine.getOptionValue("outputadxtmp") val dimadxpkg = commandLine.getOptionValue("dimadxpkg") val coalesce = commandLine.getOptionValue("coalesce") val today = commandLine.getOptionValue("today") val spark = SparkSession.builder() .appName("AdnTecentAdxDataMidWay") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputadxtmp), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(dimadxpkg), true) try { val ods_adn_adx_req_tmp_pre = s""" select request_id,device_id,'idfa' device_type,os platform,device_model,osv os_version,ct country,adx_dsp.sbid[0].bid[0].price_raw price_raw,cast(`date` as string) `date` from ( |select channel_request_id request_id, | device_info.ifa device_id, | device_info.md device_model, | device_info.os os, | device_info.osv osv, | device_info.geo.ct ct, | explode (dsp_response) adx_dsp, | `date` |from dwh.ods_adn_adx_v1_request |where concat(yyyy,mm,dd) = '${today}' and device_info.ifa rlike '${didPtn}') t |where t.adx_dsp.id in (4,27) and t.os ='ios' and adx_dsp.sbid[0].bid[0].price_raw > 100 """.stripMargin spark.sql(ods_adn_adx_req_tmp_pre) .repartition(coalesce.toInt) .persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("ods_adn_adx_req_tmp_pre") val ods_adn_adx_req_tmp = """ |select * from ods_adn_adx_req_tmp_pre """.stripMargin spark.sql(ods_adn_adx_req_tmp) .write.mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(outputadxtmp) // 数据量很少,直接写表操作 hive.exec.reducers.bytes.per.reducer val dim_adn_adx_package = s""" |insert overwrite table dwh.dim_adn_adx_package partition(dt='${today}') |select substr(t1.package_name,3) package_name |from (select request_id,package package_name,platform,backend_id |from adn_report.adndata_midway_backend_v2 where concat(yyyy,mm,dd) = '${today}' and platform ='2' and backend_id = '17' ) t1 |join ods_adn_adx_req_tmp_pre t2 |on(t1.request_id = t2.request_id) |where t1.package_name like 'id%' |group by t1.package_name """.stripMargin spark.sql(dim_adn_adx_package) //.coalesce(coalesce.toInt) //.rdd // .saveAsTextFile("") } finally { spark.stop() } 0 } } object AdnTecentAdxDataMidWay { def main(args: Array[String]): Unit = { new AdnTecentAdxDataMidWay().run(args) } }