MergeInstallGenderLR.scala 9.04 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
package mobvista.dmp.datasource.age_gender

import java.net.URI
import java.util.regex.Pattern

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.TextMultipleOutputFormat
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ArrayBuffer


class MergeInstallGenderLR extends CommonSparkJob with Serializable {
  private val wellSplit: Pattern = Pattern.compile("#")
  val TAB_DELIMITER = "\t"

  override protected def run(args: Array[String]): Int = {

    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val gender_output = commandLine.getOptionValue("gender_output")
    val gender_bin_output = commandLine.getOptionValue("gender_bin_output")
    val org_gender_bin_output = commandLine.getOptionValue("org_gender_bin_output")
    val calc_gender_bin_output = commandLine.getOptionValue("calc_gender_bin_output")
    val date = commandLine.getOptionValue("date")
    val dt_yesterday = commandLine.getOptionValue("dt_yesterday")
    val ga_date = commandLine.getOptionValue("ga_date")
    val other_date = commandLine.getOptionValue("other_date")
    val parallelism = commandLine.getOptionValue("parallelism")

    val spark = SparkSession.builder()
      .appName("MergeInstallGenderLR")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.broadcastTimeout", "2400")
      .config("spark.sql.autoBroadcastJoinThreshold", "209715200")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(gender_output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(gender_bin_output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(org_gender_bin_output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(calc_gender_bin_output), true)

    try {

      spark.udf.register("check_deviceId", mobvista.dmp.common.MobvistaConstant.checkDeviceId _)

      val ods_gender_sql = Constant.ods_gender_sql.replace("@date", date).replace("@dt_yesterday", dt_yesterday)
      // .replace("@check_deviceId", "check_deviceId(device_id)")

      spark.sql(ods_gender_sql).createOrReplaceTempView("ods_gender_tab")


      val install_list_v2_gender = Constant.dmp_install_list_gender_lr_sql.replace("@date", date)
        .replace("@ga_date", ga_date)
        .replace("@other_date", other_date)
      // .replace("@check_deviceId", "check_deviceId(device_id)")

      spark.sql(install_list_v2_gender).createOrReplaceTempView("dmp_install_list_v2")


      val sql = "select  t1.device_id,t1.device_type,t1.label,t2.package_names,case when t1.label in('m','f') then 'org' when t2.package_names is not null and  t1.label not in('m','f') then 'calc' else 'none' end as label_type " +
        "from ods_gender_tab t1 join dmp_install_list_v2 t2 on(lower(t1.device_id) = lower(t2.device_id) and t1.device_type = t2.device_type)" +
        "group by t1.device_id,t1.device_type,t1.label,t2.package_names"

      val dmp_device_gender = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK)
      dmp_device_gender.coalesce(parallelism.toInt)
        .write.mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(gender_output)


      dmp_device_gender.filter(line => !"none".equalsIgnoreCase(line.getAs[String]("label_type")))
        .rdd
        .flatMap(buildResult(_, gender_bin_output))
        .coalesce(parallelism.toInt, true)
        .saveAsNewAPIHadoopFile(org_gender_bin_output, classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], sc.hadoopConfiguration)




      /* spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK).createOrReplaceTempView("dmp_device_gender")

        val org_sql = "select  device_id,device_type,label,package_names,label_type  from dmp_device_gender"
        val bin_sql = "select  device_id,device_type,label,package_names,label_type  from dmp_device_gender where label_type != 'none' "

        val org_rdd = spark.sql(org_sql).rdd.map(line => {
          val device_id = line.getAs[String]("device_id")
          val device_type = line.getAs[String]("device_type")
          val label = line.getAs[String]("label")
          val package_names = line.getAs[String]("package_names")
          val label_type = line.getAs[String]("label_type")
          Row(device_id,device_type,label,package_names,label_type)
        })

        spark.createDataFrame(org_rdd, Constant.schema_gender_lr)
          .coalesce(parallelism.toInt)
          .write.mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(gender_output)

        spark.sql(bin_sql)
          .rdd
          .flatMap(buildResult(_, gender_bin_output))
          .coalesce(parallelism.toInt, true)
          .saveAsNewAPIHadoopFile(org_gender_bin_output, classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], sc.hadoopConfiguration)
  */


    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }


  def buildResult(row: Row, outputPrefix: String): Array[Tuple2[Text, Text]] = {

    val device_id = row.getAs[String]("device_id")
    val device_type = row.getAs[String]("device_type")
    val label = row.getAs[String]("label")
    val package_names = row.getAs[String]("package_names")
    val label_type = row.getAs[String]("label_type")
    label_type match {
      case "calc" => { //predict数据
        val buf = new StringBuilder
        buf ++= device_id
        if ("idfa".equalsIgnoreCase(device_type)) {
          buf ++= "#A\002"
        } else {
          buf ++= "#B\002"
        }
        buf ++= package_names.replace("#", "\001")
        buf ++= "\0021"
        val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
        buffer += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
        buffer.toArray
        /* device_type match {
           case "idfa" => {
             buf ++= "#A\\002"
             buf ++= package_names.replace("#","\\001")
             buf ++= "\\0021"
             val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
             buffer += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
             buffer.toArray
           }
           case _ =>{
             buf ++= "#B\\002"
             buf ++= package_names.replace("#","\\001")
             buf ++= "\\0021"
             val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
             buffer += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
             buffer.toArray
           }
         }*/
      }
      case "org" => { //train数据  只有m f
        val buf = new StringBuilder
        buf ++= "0\002"
        val bufferArray = new ArrayBuffer[Tuple2[Text, Text]]()
        if ("m".equalsIgnoreCase(label)) {
          buf ++= "1\002"
        } else {
          buf ++= "0\002"
        }
        buf ++= package_names.replace("#", "\001")
        buf ++= "\0021"
        bufferArray += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
        bufferArray.toArray
        /*
        label match {
          case "m" =>{
            buf ++= "1\\002"
            buf ++= package_names.replace("#","\\001")
            buf ++= "\\0021"
            bufferArray += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
            bufferArray.toArray
          }
          case "f" =>{
            buf ++="0\\002"
            buf ++= package_names.replace("#","\\001")
            buf ++= "\\0021"
            bufferArray += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
            bufferArray.toArray
          }
*/
      }
    }
  }


  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("gender_output", true, "[must] gender_output")
    options.addOption("gender_bin_output", true, "[must] gender_bin_output")
    options.addOption("org_gender_bin_output", true, "[must] org_gender_bin_output")
    options.addOption("calc_gender_bin_output", true, "[must] calc_gender_bin_output")
    options.addOption("date", true, "[must] date")
    options.addOption("dt_yesterday", true, "[must] dt_yesterday")
    options.addOption("ga_date", true, "[must] ga_date")
    options.addOption("other_date", true, "[must] other_date")
    options.addOption("parallelism", true, "[must] parallelism")
    options
  }
}


object MergeInstallGenderLR {
  def main(args: Array[String]): Unit = {
    new MergeInstallGenderLR().run(args)
  }
}