CalcPackageGender.scala 6.77 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
package mobvista.dmp.datasource.age_gender

import java.net.URI
import java.util
import java.util.regex.Pattern

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
  * @author wangjf
  */
class CalcPackageGender extends CommonSparkJob with Serializable {

  private val wellSplit = Pattern.compile("#")
  private val dollarSplit = Pattern.compile("\\$")

  override protected def run(args: Array[String]): Int = {

    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val input = commandLine.getOptionValue("inputPath")
    val output = commandLine.getOptionValue("outputPath")
    val parallelism = commandLine.getOptionValue("parallelism")
    val lowThreshold = commandLine.getOptionValue("lowThreshold").toInt
    val highThreshold = commandLine.getOptionValue("highThreshold").toInt
    val mRatio = commandLine.getOptionValue("mRatio").toDouble
    val fRatio = commandLine.getOptionValue("fRatio").toDouble

    val spark = SparkSession.builder()
      .appName("CalcPackageGender")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
    try {

      val merge_device_gender_rdd = spark.read.schema(Constant.merge_schema).orc(input).rdd

      val rdd = merge_device_gender_rdd.map(r => {
        val pkg_genders = new ArrayBuffer[(String, Int)]()
        val package_names = r.getAs("package_names").toString
        val gender_business = r.getAs("label").toString
        val pkgs = wellSplit.split(package_names, -1)
        val genders = dollarSplit.split(gender_business, -1)
        for (pkg <- pkgs) {
          for (gender <- genders) {
            if (!gender.equals("null") && StringUtils.isNotBlank(gender)) {
              pkg_genders += ((MRUtils.JOINER.join(pkg, wellSplit.split(gender, -1)(0)), 1))
            }
          }
        }
        pkg_genders
      }).flatMap(l => l)
        .reduceByKey(_ + _)
        .mapPartitions(mapMergeFun)

      rdd.combineByKey(
        (v: String) => Iterable(v),
        (c: Iterable[String], v: String) => c ++ Seq(v),
        (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2
      ).map(r => {
        val genderNum: mutable.HashMap[String, Integer] = scala.collection.mutable.HashMap.empty[String, Integer]
        var valid: Int = 0 //  有性别标签的数量
        val key = r._1
        for (gender <- r._2) {
          val gender_num = wellSplit.split(gender, -1)
          genderNum += (gender_num(0) -> gender_num(1).toInt)
          valid += gender_num(1).toInt //  将 package 下有性别标签的合并在一起
        }
        (key, (valid, genderNum))
      }).map(r => {
        val pkg = r._1
        val valid = r._2._1
        val genderNum = r._2._2
        val set = genderNum.keySet
        val value: StringBuilder = new StringBuilder
        var mRatio = 0.0
        var mGender = 0
        var fGender = 0
        for (name <- set) {
          if (!name.equals("null")) {
            val num = genderNum(name)
            value.append("|") //  每个包下的性别标签拼接
            value.append(name)
            value.append(":")
            value.append(num)
          }
        }
        if (valid != 0) { //当没有标签时
          mGender = if (genderNum.keySet.contains("m")) {
            genderNum("m")
          } else {
            0
          }
          fGender = if (genderNum.keySet.contains("f")) {
            genderNum("f")
          } else {
            0
          }
          mRatio = mGender.toDouble / valid
        }
        MRUtils.JOINER.join(pkg, valid, mGender.toString, fGender.toString, mRatio.toString, value)
      }).mapPartitions(mapDictFun(_, lowThreshold, highThreshold, mRatio, fRatio))
        .coalesce(parallelism.toInt).saveAsTextFile(output)

    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("inputPath", true, "[must] inputPath")
    options.addOption("outputPath", true, "[must] outputPath")
    options.addOption("parallelism", true, "[must] parallelism")
    options.addOption("lowThreshold", true, "[must] lowThreshold")
    options.addOption("highThreshold", true, "[must] highThreshold")
    options.addOption("mRatio", true, "[must] mRatio")
    options.addOption("fRatio", true, "[must] fRatio")
    options
  }

  def mapMergeFun(iter: Iterator[(String, Int)]): Iterator[(String, String)] = {
    val res = new util.ArrayList[(String, String)]()
    while (iter.hasNext) {
      val ir = iter.next
      val fields = MRUtils.SPLITTER.split(ir._1, -1)
      val pkg = fields(0)
      val tag = fields(1)
      val num = ir._2
      res.add((pkg, tag + wellSplit + num))
    }
    res.asScala.iterator
  }

  def mapFun(iter: Iterator[String]): Iterator[(String, Int)] = {
    val res = new util.ArrayList[(String, Int)]()
    while (iter.hasNext) {
      val cur = MRUtils.SPLITTER.split(iter.next, -1)
      res.add((MRUtils.JOINER.join(cur(0), cur(1)), 1))
    }
    res.asScala.iterator
  }

  def mapDictFun(iter: Iterator[String], lowThreshold: Int, highThreshold: Int, mRatio: Double, fRatio: Double): Iterator[String] = {
    val res = new util.ArrayList[String]()
    while (iter.hasNext) {
      val fields = MRUtils.SPLITTER.split(iter.next, -1)
      if (fields(4) != "0.0" && fields(1).toInt >= lowThreshold) {
        val tag = if (fields(1).toInt >= highThreshold && (fields(4).toDouble >= mRatio || fields(4).toDouble <= fRatio)) {
          "confirm"
        } else {
          "calc"
        }
        //  package_name  male_ratio  label_type
        res.add(MRUtils.JOINER.join(fields(0), fields(4), tag))
      }
    }
    res.asScala.iterator
  }

  def reduceFun(red_1: (Int, Map[String, Int]), red_2: (Int, Map[String, Int])): (Int, Map[String, Int]) = {
    val redMap = red_1._2 ++ red_2._2
      .map(t => t._1 -> (t._2 + red_1._2.getOrElse(t._1, 0)))
    (red_1._1 + red_2._1, redMap)
  }
}

object CalcPackageGender {
  def main(args: Array[String]): Unit = {
    new CalcPackageGender().run(args)
  }
}