RetargetingJob.scala 4.81 KB
package mobvista.dmp.datasource.retargeting

import java.net.URL

import com.datastax.spark.connector._
import mobvista.dmp.util.PropertyUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * @package: mobvista.dmp.datasource.retargeting
  * @author: wangjf
  * @date: 2019/5/23
  * @time: 下午4:41
  * @email: jinfeng.wang@mobvista.com
  * @phone: 152-1062-7698
  */
class RetargetingJob extends Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("region", true, "region")
    options
  }

  private def run(args: Array[String]): Unit = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val region = commandLine.getOptionValue("region")
    val spark = SparkSession
      .builder()
      .appName("RetargetingJob")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", region + "_host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.${region.toUpperCase}Factory")
      .config("spark.cassandra.connection.connections_per_executor_max", "16")
      .config("spark.cassandra.output.concurrent.writes", "2048")
      .config("spark.cassandra.concurrent.reads", "2048")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "2048")
      .config("spark.cassandra.connection.keep_alive_ms", "600000")
      .enableHiveSupport()
      .getOrCreate()

    try {
      val mongoDB = spark.read.format("com.mongodb.spark.sql")
        .option("uri", "mongodb://internal-beijing-adServerMongo-virginia-1390640500.us-east-1.elb.amazonaws.com/new_adn.campaign_device_id")
        .option("collection", "campaign_device_id")
        .load()
        .rdd.filter(r => {
        val deviceIdUrl = r.getAs("deviceIdUrl").toString
        !deviceIdUrl.endsWith(".txt")
      })

      val map = mongoDB.map(r => {
        val deviceIdUrl = r.getAs("deviceIdUrl").toString
        val campaignId = Integer.parseInt(r.getAs("campaignId").toString)
        val url = new URL(deviceIdUrl)
        val s3_path = String.format("s3:/%s", new URL(url.openConnection.getHeaderField("location").split("\\?")(0)).getPath)
        //  val status = r.getAs("status")
        (campaignId, (s3_path, 1))
      }).collectAsMap()

      val sc = spark.sparkContext
      var rdd = sc.emptyRDD[(String, Int)]
      map.foreach(m => {
        if (m._2._2 == 1) {
          rdd = rdd.union(sc.textFile(m._2._1).map(r => {
            (r, m._1)
          }))
        } else {
          rdd = rdd.union(sc.textFile(m._2._1).map(r => {
            (r, 0)
          }))
        }
      })

      //  import spark.implicits._
      val df = rdd.combineByKey(
        (v: Int) => Iterable(v),
        (c: Iterable[Int], v: Int) => c ++ Seq(v),
        (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2
      ).map(r => {
        Row(r._1.toLowerCase, r._2.toSet[Int])
      })
      //  df.createOrReplaceTempView("device_campaign")
      //  val sql = "SELECT * FROM device_campaign"
      df.saveToCassandra("dmp_realtime_service", "dmp_user_features", SomeColumns("device_id", "target_campaign_list" overwrite))

      /*
      val map = new mutable.HashMap[Int, (String, Int)]()
      val jsonArray = GsonUtil.String2JsonArray(json)
      jsonArray.foreach(json => {
        val jsonObject = json.getAsJsonObject
        map.put(jsonObject.get("campaign_id").getAsInt, (jsonObject.get("s3_path").getAsString, jsonObject.get("status").getAsInt))
      })

      var rdd = sc.emptyRDD[(String, Int)]
      map.foreach(m => {
        if (m._2._2 == 1) {
          rdd = rdd.union(sc.textFile(m._2._1).map(r => {
            (r, m._1)
          }))
        } else {
          rdd = rdd.union(sc.textFile(m._2._1).map(r => {
            (r, 0)
          }))
        }
      })
      //  import spark.implicits._
      val df = rdd.combineByKey(
        (v: Int) => Iterable(v),
        (c: Iterable[Int], v: Int) => c ++ Seq(v),
        (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2
      ).map(r => {
        DeviceTarget(r._1.toLowerCase, r._2.toSet[Int])
      }).repartition(10)

      df.saveToCassandra("dmp_realtime_service", "dmp_user_features", SomeColumns("device_id", "target_campaign_list" overwrite))
      */
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
  }
}

object RetargetingJob {
  def main(args: Array[String]): Unit = {
    new RetargetingJob().run(args)
  }
}