JoypacUserFeatureJob.scala 5.71 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
package mobvista.dmp.datasource.joypac

import java.net.URI
import java.util.Properties

import mobvista.dmp.datasource.retargeting.Constant
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import scala.collection.mutable

class JoypacUserFeatureJob extends Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options.addOption("dict_output", true, "dict_output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  var packageIds: Set[Int] = null

  val user_feature_sql: String =
    s"""SELECT LOWER(device_id) device_id, age, gender, interest, concat_ws(',',install) install_apps, frequency
       |  FROM dwh.dm_user_info
       |  WHERE dt = '@date' AND update_date = '@update_date' AND ((country != '' AND country IS NOT NULL) OR (age != '' AND age IS NOT NULL) OR
       |  (gender != '' AND gender IS NOT NULL) OR (size(interest) != 0 AND interest IS NOT NULL) OR (size(install) != 0
       |  AND install IS NOT NULL)) AND has(install)
    """.stripMargin

  private def run(args: Array[String]) {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")
    val dict_output = commandLine.getOptionValue("dict_output")
    val coalesce = commandLine.getOptionValue("coalesce")

    val spark = SparkSession
      .builder()
      .appName(s"JoypacUserFeatureJob.$date")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .enableHiveSupport()
      .getOrCreate()

    try {
      val sc = spark.sparkContext
      /*
      val bMap = spark.sparkContext.broadcast(spark.sql(Constant.id_old2new_sql).rdd.map(r => {
        (r.getAs("tag_code").toString, r.getAs("tag_id").toString)
      }).collectAsMap())
      */

      val packageNames = jdbcConnection(spark, "dmp", "package_list").rdd.map(r => {
        var packageName = r.getAs("package_name").toString.toLowerCase()
        if (packageName.matches("^id[0-9]+$")) {
          packageName = packageName.replace("id", "")
        }
        packageName
      }).cache().collect().toSet
      val rdd = Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd
        .filter(r => {
          packageNames.contains(r.getAs("app_package_name").toString.toLowerCase())
        }).cache()
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(dict_output), true)
      rdd.map(r => {
        (r.getAs("app_package_name").toString, Integer.parseInt(r.getAs("id").toString))
      }).coalesce(1).saveAsTextFile(dict_output)

      packageIds = rdd.map(r => {
        Integer.parseInt(r.getAs("id").toString)
      }).collect().toSet

      println("packageIds.size ==>> " + packageIds.size)
      spark.udf.register("has", has _)
      val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd")
      val sql = user_feature_sql.replace("@date", date)
        .replace("@update_date", update_date)
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
      spark.sql(sql)
        .repartition(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "snappy")
        .orc(output)

    } finally {
      if (spark != null) {
        spark.stop()
      }
    }

    def jdbcConnection(spark: SparkSession, database: String, table: String): DataFrame = {
      val properties = new Properties()
      properties.put("driver", "com.mysql.jdbc.Driver")
      properties.put("user", "apptag_rw")
      properties.put("password", "7gyLEVtkER3u8c9")
      properties.put("characterEncoding", "utf8")
      val url = s"jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/${database}"
      spark.read.jdbc(url = url, table = table, properties = properties)
    }

    def has(install: mutable.WrappedArray[Int]): Boolean = {
      var flag = false
      packageIds.foreach(packageId => {
        if (install.toSet.contains(packageId)) {
          flag = true
        }
      })
      flag
    }

    /*
    def mapFun(row: Row): DeviceTag = {
      val interest = row.getAs("interest").asInstanceOf[mutable.WrappedArray[String]]
      val interest_set = new mutable.HashSet[Int]()
      interest.foreach(r => {
        if (bMap.value.keySet.contains(r) && StringUtils.isNotBlank(bMap.value(r))) {
          interest_set.add(bMap.value(r).toInt)
        }
      })

      val frequencySet = new mutable.HashSet[struct]()
      val frequency = row.getAs("frequency").toString
      import scala.collection.JavaConversions._
      val json = GsonUtil.String2JsonObject(frequency)
      json.entrySet().foreach(j => {
        if (StringUtils.isNotBlank(j.getKey) && bMap.value.keySet.contains(j.getKey)) {
          frequencySet.add(struct(bMap.value(j.getKey), j.getValue.getAsInt))

          interest_set.add(bMap.value(j.getKey).toInt)
        }
      })

      DeviceTag(row.getAs("device_id"), row.getAs("age"), row.getAs("gender"), row.getAs("install_apps"),
        interest_set.mkString(","), mutable.WrappedArray.make(frequencySet.toArray))
    }
    */
  }
}

object JoypacUserFeatureJob {
  def main(args: Array[String]): Unit = {
    new JoypacUserFeatureJob().run(args)
  }
}