package mobvista.dmp.datasource.taobao import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import scala.collection.mutable.ArrayBuffer class EtlDealidDaily extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("oppooutput", true, "[must] oppooutput") options.addOption("inmobioutput", true, "[must] inmobioutput") options.addOption("dt_dash_today", true, "[must] dt_dash_today") options } def buildRes(row: Row, date: String,packageName: String): Array[String] = { val buffer = new ArrayBuffer[String]() val os = row.getAs[String]("os") val ext5 = row.getAs[String]("ext5") if (os == "ios") { val idfa = ext5.split(",")(1) val idfamd5 = ext5.split(",")(2) if (!idfa.isEmpty) { buffer += idfa + "\t" + "idfa" + "\t" + "ios" + "\t" + packageName + "\t" + date } if (!idfamd5.isEmpty) { buffer += idfamd5 + "\t" + "idfamd5" + "\t" + "ios" + "\t" +packageName + "\t" + date } } else if (os == "android") { val gaid = ext5.split(",")(0) val imei = ext5.split(",")(4) val imeimd5 = ext5.split(",")(5) if (!gaid.isEmpty) { buffer += gaid + "\t" + "gaid" + "\t" + "android" + "\t" + packageName + "\t" + date } if (!imei.isEmpty) { buffer += imei + "\t" + "imei" + "\t" + "android" + "\t" + packageName + "\t" + date } if (!imeimd5.isEmpty) { buffer += imeimd5 + "\t" + "imeimd5" + "\t" + "android" + "\t" + packageName + "\t" + date } } buffer.toArray } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val oppooutput = commandLine.getOptionValue("oppooutput") val inmobioutput = commandLine.getOptionValue("inmobioutput") val dt_dash_today = commandLine.getOptionValue("dt_dash_today") val spark = SparkSession.builder() .appName("EtlAliActivitionDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(oppooutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(inmobioutput), true) val year: String = dt_dash_today.split("-")(0) val month: String = dt_dash_today.split("-")(1) val date: String = dt_dash_today.split("-")(2) val hour: String = dt_dash_today.split("-")(3) val update: String = dt_dash_today.substring(0, dt_dash_today.lastIndexOf("-")) try { val sql01: String = s""" |select os,ext5 |from adn_dsp.log_adn_dsp_request_orc_hour |where yr='${year}' and mt='${month}' and dt='${date}' and hh='${hour}' |and (get_json_object(ext3,"$$.dealids") like "%2532%" or get_json_object(ext3,"$$.dealids") like "%2533%") and exchanges='oppocn' |""".stripMargin spark.sql(sql01).rdd.flatMap(buildRes(_, update,"com.taobao.taobao_oppo")).coalesce(300).saveAsTextFile(oppooutput, classOf[GzipCodec]) val sql02: String = s""" |select os,ext5 |from adn_dsp.log_adn_dsp_request_orc_hour |where yr='${year}' and mt='${month}' and dt='${date}' and hh='${hour}' |and get_json_object(ext3,"$$.dealids") like "%1594807676568%" and exchanges='inmobi' |""".stripMargin spark.sql(sql02).rdd.flatMap(buildRes(_, update,"com.taobao.taobao_inmobi")).coalesce(300).saveAsTextFile(inmobioutput, classOf[GzipCodec]) } finally { spark.stop() } 0 } } object EtlDealidDaily { def main(args: Array[String]): Unit = { new EtlDealidDaily().run(args) } }