MergeInstallGenderLR.scala 9.04 KB
package mobvista.dmp.datasource.age_gender

import java.net.URI
import java.util.regex.Pattern

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.TextMultipleOutputFormat
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ArrayBuffer


class MergeInstallGenderLR extends CommonSparkJob with Serializable {
  private val wellSplit: Pattern = Pattern.compile("#")
  val TAB_DELIMITER = "\t"

  override protected def run(args: Array[String]): Int = {

    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else {
      printOptions(commandLine)
    }

    val gender_output = commandLine.getOptionValue("gender_output")
    val gender_bin_output = commandLine.getOptionValue("gender_bin_output")
    val org_gender_bin_output = commandLine.getOptionValue("org_gender_bin_output")
    val calc_gender_bin_output = commandLine.getOptionValue("calc_gender_bin_output")
    val date = commandLine.getOptionValue("date")
    val dt_yesterday = commandLine.getOptionValue("dt_yesterday")
    val ga_date = commandLine.getOptionValue("ga_date")
    val other_date = commandLine.getOptionValue("other_date")
    val parallelism = commandLine.getOptionValue("parallelism")

    val spark = SparkSession.builder()
      .appName("MergeInstallGenderLR")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.broadcastTimeout", "2400")
      .config("spark.sql.autoBroadcastJoinThreshold", "209715200")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(gender_output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(gender_bin_output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(org_gender_bin_output), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(calc_gender_bin_output), true)

    try {

      spark.udf.register("check_deviceId", mobvista.dmp.common.MobvistaConstant.checkDeviceId _)

      val ods_gender_sql = Constant.ods_gender_sql.replace("@date", date).replace("@dt_yesterday", dt_yesterday)
      // .replace("@check_deviceId", "check_deviceId(device_id)")

      spark.sql(ods_gender_sql).createOrReplaceTempView("ods_gender_tab")


      val install_list_v2_gender = Constant.dmp_install_list_gender_lr_sql.replace("@date", date)
        .replace("@ga_date", ga_date)
        .replace("@other_date", other_date)
      // .replace("@check_deviceId", "check_deviceId(device_id)")

      spark.sql(install_list_v2_gender).createOrReplaceTempView("dmp_install_list_v2")


      val sql = "select  t1.device_id,t1.device_type,t1.label,t2.package_names,case when t1.label in('m','f') then 'org' when t2.package_names is not null and  t1.label not in('m','f') then 'calc' else 'none' end as label_type " +
        "from ods_gender_tab t1 join dmp_install_list_v2 t2 on(lower(t1.device_id) = lower(t2.device_id) and t1.device_type = t2.device_type)" +
        "group by t1.device_id,t1.device_type,t1.label,t2.package_names"

      val dmp_device_gender = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK)
      dmp_device_gender.coalesce(parallelism.toInt)
        .write.mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(gender_output)


      dmp_device_gender.filter(line => !"none".equalsIgnoreCase(line.getAs[String]("label_type")))
        .rdd
        .flatMap(buildResult(_, gender_bin_output))
        .coalesce(parallelism.toInt, true)
        .saveAsNewAPIHadoopFile(org_gender_bin_output, classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], sc.hadoopConfiguration)




      /* spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK).createOrReplaceTempView("dmp_device_gender")

        val org_sql = "select  device_id,device_type,label,package_names,label_type  from dmp_device_gender"
        val bin_sql = "select  device_id,device_type,label,package_names,label_type  from dmp_device_gender where label_type != 'none' "

        val org_rdd = spark.sql(org_sql).rdd.map(line => {
          val device_id = line.getAs[String]("device_id")
          val device_type = line.getAs[String]("device_type")
          val label = line.getAs[String]("label")
          val package_names = line.getAs[String]("package_names")
          val label_type = line.getAs[String]("label_type")
          Row(device_id,device_type,label,package_names,label_type)
        })

        spark.createDataFrame(org_rdd, Constant.schema_gender_lr)
          .coalesce(parallelism.toInt)
          .write.mode(SaveMode.Overwrite)
          .option("orc.compress", "zlib")
          .orc(gender_output)

        spark.sql(bin_sql)
          .rdd
          .flatMap(buildResult(_, gender_bin_output))
          .coalesce(parallelism.toInt, true)
          .saveAsNewAPIHadoopFile(org_gender_bin_output, classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], sc.hadoopConfiguration)
  */


    } finally {
      sc.stop()
      spark.stop()
    }
    0
  }


  def buildResult(row: Row, outputPrefix: String): Array[Tuple2[Text, Text]] = {

    val device_id = row.getAs[String]("device_id")
    val device_type = row.getAs[String]("device_type")
    val label = row.getAs[String]("label")
    val package_names = row.getAs[String]("package_names")
    val label_type = row.getAs[String]("label_type")
    label_type match {
      case "calc" => { //predict数据
        val buf = new StringBuilder
        buf ++= device_id
        if ("idfa".equalsIgnoreCase(device_type)) {
          buf ++= "#A\002"
        } else {
          buf ++= "#B\002"
        }
        buf ++= package_names.replace("#", "\001")
        buf ++= "\0021"
        val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
        buffer += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
        buffer.toArray
        /* device_type match {
           case "idfa" => {
             buf ++= "#A\\002"
             buf ++= package_names.replace("#","\\001")
             buf ++= "\\0021"
             val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
             buffer += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
             buffer.toArray
           }
           case _ =>{
             buf ++= "#B\\002"
             buf ++= package_names.replace("#","\\001")
             buf ++= "\\0021"
             val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
             buffer += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
             buffer.toArray
           }
         }*/
      }
      case "org" => { //train数据  只有m f
        val buf = new StringBuilder
        buf ++= "0\002"
        val bufferArray = new ArrayBuffer[Tuple2[Text, Text]]()
        if ("m".equalsIgnoreCase(label)) {
          buf ++= "1\002"
        } else {
          buf ++= "0\002"
        }
        buf ++= package_names.replace("#", "\001")
        buf ++= "\0021"
        bufferArray += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
        bufferArray.toArray
        /*
        label match {
          case "m" =>{
            buf ++= "1\\002"
            buf ++= package_names.replace("#","\\001")
            buf ++= "\\0021"
            bufferArray += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
            bufferArray.toArray
          }
          case "f" =>{
            buf ++="0\\002"
            buf ++= package_names.replace("#","\\001")
            buf ++= "\\0021"
            bufferArray += Tuple2(new Text(s"${outputPrefix}/${label_type}, "), new Text(buf.toString()))
            bufferArray.toArray
          }
*/
      }
    }
  }


  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("gender_output", true, "[must] gender_output")
    options.addOption("gender_bin_output", true, "[must] gender_bin_output")
    options.addOption("org_gender_bin_output", true, "[must] org_gender_bin_output")
    options.addOption("calc_gender_bin_output", true, "[must] calc_gender_bin_output")
    options.addOption("date", true, "[must] date")
    options.addOption("dt_yesterday", true, "[must] dt_yesterday")
    options.addOption("ga_date", true, "[must] ga_date")
    options.addOption("other_date", true, "[must] other_date")
    options.addOption("parallelism", true, "[must] parallelism")
    options
  }
}


object MergeInstallGenderLR {
  def main(args: Array[String]): Unit = {
    new MergeInstallGenderLR().run(args)
  }
}