package mobvista.dmp.datasource.dsp import java.net.URI import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.format.MultipleOrcOutputFormat import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.Text import org.apache.orc.TypeDescription import org.apache.orc.mapred.OrcStruct import org.apache.spark.sql.{Row, SparkSession} class DspClickImpressionCombineOrc extends CommonSparkJob with Serializable { val schema = "struct<time:string,xforwardip:string,ip:string,exchanges:string,elapsed:string,url:string,body:string,requestid:string,bid:string,price:string,describe:string,ext1:string,ext2:string,ext3:string,ext4:string,ext5:string,auctiontype:string,bidreqid:string,impid:string,publisherid:string,appid:string,appname:string,posid:string,category:string,intl:string,imagesize:string,deviceip:string,make:string,model:string,os:string,osv:string,devicetype:string,cncttype:string,countrycode:string,googleadid:string,imeisha1:string,androididmd5:string,idfa:string,keywords:string,yob:string,gender:string,ext6:string,ext7:string,ext8:string,ext9:string,ext10:string,campaignid:string,cinstallprice:string,cappname:string,cpackagename:string,cadvertiserid:string,ccreativeid:string>" override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val cur_date = commandLine.getOptionValue("cur_date") val begin_date = commandLine.getOptionValue("begin_date") val output = commandLine.getOptionValue("output") val cpcoutput = commandLine.getOptionValue("cpcoutput") val cpmoutput = commandLine.getOptionValue("cpmoutput") val cpioutput = commandLine.getOptionValue("cpioutput") val coalesce = commandLine.getOptionValue("coalesce") val spark = SparkSession .builder() .config("spark.rdd.compress", "true") .config("spark.speculation", "false") .config("spark.speculation.quantile", "0.9") .config("spark.speculation.multiplier", "1") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("mapreduce.fileoutputcommitter.marksuccessfuljobs","false") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(cpcoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(cpmoutput), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(cpioutput), true) println("<====>" + cpcoutput +";"+ cpmoutput +";"+ cpioutput) try { val hql = s""" |select |coalesce(t1.time,t2.time) time, |t2.xforwardip , |t2.ip , |coalesce(t1.exchanges,t2.exchanges) exchanges, |t2.elapsed , |t2.url , |t2.body , |t2.requestid , |t2.bid , |t2.price , |t2.`describe` , |coalesce(t1.algorithm,t2.ext1) ext1, |t2.ext2 , |t2.ext3 , |t2.ext4 , |t2.ext5 , |t2.auctiontype , |t2.bidreqid , |t2.impid , |coalesce(t1.publisher_id,t2.publisherid) publisherid, |coalesce(t1.app_id,t2.appid) appid, |coalesce(t1.package,t2.appname ) appname, |t2.posid , |t2.category , |t2.intl , |t2.imagesize , |t2.deviceip , |t2.make , |t2.model , |coalesce(t1.platform, t2.os) os, |coalesce(t1.os_version,t2.osv) osv, |t2.devicetype , |t2.cncttype , |coalesce(t1.country_code,t2.countrycode) countrycode , |coalesce(t1.gaid,t2.googleadid) googleadid, |t2.imeisha1 , |t2.androididmd5 , |coalesce(t1.idfa,t2.idfa) idfa, |t2.keywords , |t2.yob , |t2.gender , |t2.ext6 , |t2.ext7 , |t2.ext8 , |t2.ext9 , |t2.ext10 , |coalesce(t1.campaign_id,t2.campaignid ) campaignid , |t2.cinstallprice, |t2.cappname , |t2.cpackagename , |coalesce(t1.advertiser_id,t2.cadvertiserid) cadvertiserid, |t2.ccreativeid, |t1.day, |t1.billing_type |from (select * from adn_dsp.ods_dsp_click_v4 where day ='${cur_date}' ) t1 |left join (select * from adn_dsp.log_adn_dsp_impression_hour where concat(yr,mt,dt) >= '${begin_date}' and concat(yr,mt,dt) >= '${cur_date}') t2 |on(t1.requestid = t2.requestid) """.stripMargin spark.sql(hql) .rdd.map(line =>{ val billing_type = line.getAs[String]("billing_type") new Tuple2(new Text(s"${output}/${billing_type}"), genRow(line,schema))}) .coalesce(coalesce.toInt) .saveAsNewAPIHadoopFile( s"${cpioutput}", classOf[Text], classOf[OrcStruct], classOf[MultipleOrcOutputFormat[Text, OrcStruct]], initConfig(spark.sparkContext.hadoopConfiguration)) } finally { if (spark != null) { spark.stop() } } 0 } def genVal(str: String): String = { if(StringUtils.isNotBlank(str)){ str }else "" } def genRow(row: Row, schema: String): OrcStruct = { val struct = OrcStruct.createValue(TypeDescription.fromString(schema)).asInstanceOf[OrcStruct] struct.setFieldValue(0, new Text( genVal(row.getAs[String]("time")))) struct.setFieldValue(1, new Text(genVal(row.getAs[String]("xforwardip")))) struct.setFieldValue(2, new Text(genVal(row.getAs[String]("ip")))) struct.setFieldValue(3, new Text(genVal(row.getAs[String]("exchanges")))) struct.setFieldValue(4, new Text(genVal(row.getAs[String]("elapsed")))) struct.setFieldValue(5, new Text(genVal(row.getAs[String]("url")))) struct.setFieldValue(6, new Text(genVal(row.getAs[String]("body")))) struct.setFieldValue(7, new Text(genVal(row.getAs[String]("requestid")))) struct.setFieldValue(8, new Text(genVal(row.getAs[String]("bid")))) struct.setFieldValue(9, new Text(genVal(row.getAs[String]("price")))) struct.setFieldValue(10, new Text(genVal(row.getAs[String]("describe")))) struct.setFieldValue(11, new Text(genVal(row.getAs[String]("ext1")))) struct.setFieldValue(12, new Text(genVal(row.getAs[String]("ext2")))) struct.setFieldValue(13, new Text(genVal(row.getAs[String]("ext3")))) struct.setFieldValue(14, new Text(genVal(row.getAs[String]("ext4")))) struct.setFieldValue(15, new Text(genVal(row.getAs[String]("ext5")))) struct.setFieldValue(16, new Text(genVal(row.getAs[String]("auctiontype")))) struct.setFieldValue(17, new Text(genVal(row.getAs[String]("bidreqid")))) struct.setFieldValue(18, new Text(genVal(row.getAs[String]("impid")))) struct.setFieldValue(19, new Text(genVal(row.getAs[String]("publisherid")))) struct.setFieldValue(20, new Text(genVal(row.getAs[String]("appid")))) struct.setFieldValue(21, new Text(genVal(row.getAs[String]("appname")))) struct.setFieldValue(22, new Text(genVal(row.getAs[String]("posid")))) struct.setFieldValue(23, new Text(genVal(row.getAs[String]("category")))) struct.setFieldValue(24, new Text(genVal(row.getAs[String]("intl")))) struct.setFieldValue(25, new Text(genVal(row.getAs[String]("imagesize")))) struct.setFieldValue(26, new Text(genVal(row.getAs[String]("deviceip")))) struct.setFieldValue(27, new Text(genVal(row.getAs[String]("make")))) struct.setFieldValue(28, new Text(genVal(row.getAs[String]("model")))) struct.setFieldValue(29, new Text(genVal(row.getAs[String]("os")))) struct.setFieldValue(30, new Text(genVal(row.getAs[String]("osv")))) struct.setFieldValue(31, new Text(genVal(row.getAs[String]("devicetype")))) struct.setFieldValue(32, new Text(genVal(row.getAs[String]("cncttype")))) struct.setFieldValue(33, new Text(genVal(row.getAs[String]("countrycode")))) struct.setFieldValue(34, new Text(genVal(row.getAs[String]("googleadid")))) struct.setFieldValue(35, new Text(genVal(row.getAs[String]("imeisha1")))) struct.setFieldValue(36, new Text(genVal(row.getAs[String]("androididmd5")))) struct.setFieldValue(37, new Text(genVal(row.getAs[String]("idfa")))) struct.setFieldValue(38, new Text(genVal(row.getAs[String]("keywords")))) struct.setFieldValue(39, new Text(genVal(row.getAs[String]("yob")))) struct.setFieldValue(40, new Text(genVal(row.getAs[String]("gender")))) struct.setFieldValue(41, new Text(genVal(row.getAs[String]("ext6")))) struct.setFieldValue(42, new Text(genVal(row.getAs[String]("ext7")))) struct.setFieldValue(43, new Text(genVal(row.getAs[String]("ext8")))) struct.setFieldValue(44, new Text(genVal(row.getAs[String]("ext9")))) struct.setFieldValue(45, new Text(genVal(row.getAs[String]("ext10")))) struct.setFieldValue(46, new Text(genVal(row.getAs[String]("campaignid")))) struct.setFieldValue(47, new Text(genVal(row.getAs[String]("cinstallprice")))) struct.setFieldValue(48, new Text(genVal(row.getAs[String]("cappname")))) struct.setFieldValue(49, new Text(genVal(row.getAs[String]("cpackagename")))) struct.setFieldValue(50, new Text(genVal(row.getAs[String]("cadvertiserid")))) struct.setFieldValue(51, new Text(genVal(row.getAs[String]("ccreativeid")))) struct } def commandOptions(): Options ={ val options = new Options() options.addOption("cur_date", true, "cur_date Time") options.addOption("begin_date", true, "begin_date Time") options.addOption("output", true, "output dir") options.addOption("cpcoutput", true, "cpcoutput dir") options.addOption("cpmoutput", true, "cpmoutput dir") options.addOption("cpioutput", true, "cpioutput dir") options.addOption("coalesce",true,"coalesce") options } def initConfig(conf: Configuration): Configuration = { conf.set("orc.mapred.output.schema", schema) conf.setBoolean("mapreduce.output.compress", true) conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.setBoolean("mapreduce.output.fileoutputformat.compress", true) conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec") conf.set("orc.compress", "ZLIB") conf } } object DspClickImpressionCombineOrc { def main(args: Array[String]): Unit = { new DspClickImpressionCombineOrc().run(args) } }