RetargetingJob.scala 4.81 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
package mobvista.dmp.datasource.retargeting

import java.net.URL

import com.datastax.spark.connector._
import mobvista.dmp.util.PropertyUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * @package: mobvista.dmp.datasource.retargeting
  * @author: wangjf
  * @date: 2019/5/23
  * @time: 下午4:41
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class RetargetingJob extends Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("region", true, "region")
    options
  }

  private def run(args: Array[String]): Unit = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val region = commandLine.getOptionValue("region")
    val spark = SparkSession
      .builder()
      .appName("RetargetingJob")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", region + "_host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.${region.toUpperCase}Factory")
      .config("spark.cassandra.connection.connections_per_executor_max", "16")
      .config("spark.cassandra.output.concurrent.writes", "2048")
      .config("spark.cassandra.concurrent.reads", "2048")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "2048")
      .config("spark.cassandra.connection.keep_alive_ms", "600000")
      .enableHiveSupport()
      .getOrCreate()

    try {
      val mongoDB = spark.read.format("com.mongodb.spark.sql")
        .option("uri", "mongodb://internal-beijing-adServerMongo-virginia-1390640500.us-east-1.elb.amazonaws.com/new_adn.campaign_device_id")
        .option("collection", "campaign_device_id")
        .load()
        .rdd.filter(r => {
        val deviceIdUrl = r.getAs("deviceIdUrl").toString
        !deviceIdUrl.endsWith(".txt")
      })

      val map = mongoDB.map(r => {
        val deviceIdUrl = r.getAs("deviceIdUrl").toString
        val campaignId = Integer.parseInt(r.getAs("campaignId").toString)
        val url = new URL(deviceIdUrl)
        val s3_path = String.format("s3:/%s", new URL(url.openConnection.getHeaderField("location").split("\\?")(0)).getPath)
        //  val status = r.getAs("status")
        (campaignId, (s3_path, 1))
      }).collectAsMap()

      val sc = spark.sparkContext
      var rdd = sc.emptyRDD[(String, Int)]
      map.foreach(m => {
        if (m._2._2 == 1) {
          rdd = rdd.union(sc.textFile(m._2._1).map(r => {
            (r, m._1)
          }))
        } else {
          rdd = rdd.union(sc.textFile(m._2._1).map(r => {
            (r, 0)
          }))
        }
      })

      //  import spark.implicits._
      val df = rdd.combineByKey(
        (v: Int) => Iterable(v),
        (c: Iterable[Int], v: Int) => c ++ Seq(v),
        (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2
      ).map(r => {
        Row(r._1.toLowerCase, r._2.toSet[Int])
      })
      //  df.createOrReplaceTempView("device_campaign")
      //  val sql = "SELECT * FROM device_campaign"
      df.saveToCassandra("dmp_realtime_service", "dmp_user_features", SomeColumns("device_id", "target_campaign_list" overwrite))

      /*
      val map = new mutable.HashMap[Int, (String, Int)]()
      val jsonArray = GsonUtil.String2JsonArray(json)
      jsonArray.foreach(json => {
        val jsonObject = json.getAsJsonObject
        map.put(jsonObject.get("campaign_id").getAsInt, (jsonObject.get("s3_path").getAsString, jsonObject.get("status").getAsInt))
      })

      var rdd = sc.emptyRDD[(String, Int)]
      map.foreach(m => {
        if (m._2._2 == 1) {
          rdd = rdd.union(sc.textFile(m._2._1).map(r => {
            (r, m._1)
          }))
        } else {
          rdd = rdd.union(sc.textFile(m._2._1).map(r => {
            (r, 0)
          }))
        }
      })
      //  import spark.implicits._
      val df = rdd.combineByKey(
        (v: Int) => Iterable(v),
        (c: Iterable[Int], v: Int) => c ++ Seq(v),
        (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2
      ).map(r => {
        DeviceTarget(r._1.toLowerCase, r._2.toSet[Int])
      }).repartition(10)

      df.saveToCassandra("dmp_realtime_service", "dmp_user_features", SomeColumns("device_id", "target_campaign_list" overwrite))
      */
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
  }
}

object RetargetingJob {
  def main(args: Array[String]): Unit = {
    new RetargetingJob().run(args)
  }
}