package mobvista.dmp.datasource.dsp import java.net.URI import java.util import mobvista.dmp.common.CommonSparkJob import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{collect_set, first, max, udf} import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import scala.collection.JavaConversions._ import scala.collection.mutable class DspOrgLogEtlDailys extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val input_etl_dsp_request_hour = commandLine.getOptionValue("input") val output_etl_dsp_request_hour = commandLine.getOptionValue("output") val input_mds_dsp_request_hour = commandLine.getOptionValue("input_mds_dsp_request_hour") val output_mds_dsp_request_hour = commandLine.getOptionValue("output_mds_dsp_request_hour") val parallelism = commandLine.getOptionValue("parallelism").toInt val coalesce = commandLine.getOptionValue("coalesce").toInt val spark = SparkSession .builder() .config("spark.rdd.compress", "true") .config("spark.default.parallelism", s"${parallelism}") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output_etl_dsp_request_hour),true) import spark.implicits._ try { val mergeUdf = udf((segmentArrays: mutable.WrappedArray[String]) => { // val segmentMap = Maps.newHashMap(); var res ="" val segmentSet: util.Set[String] = new util.HashSet[String]() if(segmentArrays != null && segmentArrays.size != 0){ for (segmentArray <- segmentArrays) { // println("mergeUdf:segmentArray1>" + segmentArray.toString ) val jsonArray = GsonUtil.String2JsonArray(segmentArray.toString) for (json <- jsonArray) { //mergeUdf:segmentArray2>[{"name":"carrier","value":"verified"}];{"name":"carrier","value":"verified"} // println("mergeUdf:segmentArray2>" + segmentArray.toString + ";" +json.toString) segmentSet.add(json.toString) } } } if(!segmentSet.isEmpty){ res = "[" + segmentSet.mkString(",") + "]" } res }) val mergePkgUdf = udf((pkgsArrays: mutable.WrappedArray[String]) => { var res = "[]" val pkgSet: util.Set[String] = new util.HashSet[String]() if(pkgsArrays != null && pkgsArrays.size != 0 ){ for (pkgs <- pkgsArrays) { // 字符串#分隔 // println("mergePkgUdf.pkgs:" + pkgs.toString + ";") mergePkgUdf.pkgs:com.playgendary.tanks#com.nflystudio.InfiniteStaircase; if(StringUtils.isNotBlank(pkgs)){ val pkgsname =pkgs.split("#",-1) for(pkgname <- pkgsname){ pkgSet.add(pkgname) } } } } if(pkgSet.size()!=0){ // res = new ObjectMapper().writeValueAsString(pkgSet) res = "[\"" + pkgSet.mkString("\",\"") + "\"]" } res }) val mergeAdridIdUdf = udf((adrsArrays: mutable.WrappedArray[String]) => { var res = "" val adrSet: util.Set[String] = new util.HashSet[String]() if(adrsArrays != null && adrsArrays.size != 0){ for (adrs <- adrsArrays) { // 字符串#分隔,与pgkName逻辑相同 // println("mergePkgUdf.pkgs:" + pkgs.toString + ";") mergePkgUdf.pkgs:com.playgendary.tanks#com.nflystudio.InfiniteStaircase; if(StringUtils.isNotBlank(adrs)){ val adrId =adrs.split("#",-1) for(adr <- adrId){ adrSet.add(adr) } } } } if(adrSet.size()!=0){ res = adrSet.mkString(",") } res }) spark.read.schema(dspEtlSchema).orc(input_etl_dsp_request_hour).map(row =>{ val device_id = row.getAs[String]("device_id") val device_type = row.getAs[String]("device_type") val platform = row.getAs[String]("platform") val country_code = row.getAs[String]("country_code") val ip = row.getAs[String]("ip") val gender = row.getAs[String]("gender") val birthday = row.getAs[String]("birthday") val maker = row.getAs[String]("maker") val model = row.getAs[String]("model") val os_version = row.getAs[String]("os_version") val package_list= row.getSeq[String](10) val androidids = row.getSeq[String](11) val datetime = row.getAs[String]("datetime") val segment_ids = row.getAs[String]("segment_ids") val setPkg = new util.HashSet[String]() if(package_list.size > 0 ){ package_list.foreach(line =>{ if(StringUtils.isNotBlank(line)){ setPkg.add(line) } }) } val setAdrIds = new util.HashSet[String]() if(androidids.size > 0 ){ androidids.foreach(line =>{ if(StringUtils.isNotBlank(line)){ setAdrIds.add(line) } }) } (device_id,device_type,platform,country_code,ip,gender,birthday,maker,model,os_version,setPkg.mkString("#"),setAdrIds.mkString("#"),datetime, segment_ids) // [{"id":"1462","value":"0.3"},{"id":"381","value":"0.2"},{"id":"937","value":"0.3"},{"id":"41","value":"0.3"}] }).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker", "model", "os_version", "package_list", "androidids", "datetime", "segment_ids") .groupBy("device_id","device_type") .agg(first("platform"), first("country_code"), first("ip"), first("gender"), first("birthday"), first("maker"), first("model"), first("os_version"), mergePkgUdf(collect_set("package_list")), mergeAdridIdUdf(collect_set("androidids")), max("datetime"), mergeUdf(collect_set("segment_ids")) ).coalesce(coalesce.toInt) .rdd.map(_.mkString("\t")).saveAsTextFile(output_etl_dsp_request_hour,classOf[GzipCodec]) spark.read.schema(dspMdsSchema).orc(input_mds_dsp_request_hour).createOrReplaceTempView("mds_dsp_request_hour") val sql="select device_id,device_type,platform" + " ,req_time,ip, geo,longitude,latitude" + " from mds_dsp_request_hour group by device_id,device_type,platform,req_time,ip, geo,longitude,latitude" spark.sql(sql).coalesce(coalesce.toInt) .rdd.map(_.mkString("\t")).saveAsTextFile(output_mds_dsp_request_hour,classOf[GzipCodec]) // spark.read.orc(input).map(row =>{row.getList[String](10) +"\t" + row.getList[String](11)}).rdd.saveAsTextFile(output) } finally { if (spark != null) { spark.stop() } } 0 } // *struct<device_id:string,device_type:string,platform:string,req_time:string,ip:string,geo:string,longitude:string,latitude:string> def dspMdsSchema: StructType = { StructType(StructField("device_id", StringType) :: StructField("device_type", StringType) :: StructField("platform", StringType) :: StructField("req_time", StringType) :: StructField("ip", StringType) :: StructField("geo", StringType) :: StructField("longitude", StringType) :: StructField("latitude", StringType) :: Nil) } def dspEtlSchema: StructType = { StructType(StructField("device_id", StringType) :: StructField("device_type", StringType) :: StructField("platform", StringType) :: StructField("country_code", StringType) :: StructField("ip", StringType) :: StructField("gender", StringType) :: StructField("birthday", StringType) :: StructField("maker", StringType) :: StructField("model", StringType) :: StructField("os_version", StringType) :: StructField("package_list", ArrayType(StringType)) :: StructField("androidids", ArrayType(StringType)) :: StructField("datetime", StringType) :: StructField("segment_ids", StringType) :: Nil) } def commandOptions(): Options = { val options = new Options() options.addOption("input", true, "input") options.addOption("output", true, "output") options.addOption("input_mds_dsp_request_hour", true, "input_mds_dsp_request_hour") options.addOption("output_mds_dsp_request_hour", true, "output_mds_dsp_request_hour") options.addOption("parallelism", true, "parallelism") options.addOption("coalesce", true, "coalesce") options } } object DspOrgLogEtlDailys { def main(args: Array[String]): Unit = { new DspOrgLogEtlDailys().run(args) } }