RetargetingCassandra.scala 6.05 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
package mobvista.dmp.datasource.retargeting

import java.net.URL

import com.datastax.spark.connector._
import mobvista.dmp.util.{MD5Util, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.mutable.ArrayBuffer

class RetargetingCassandra extends Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("region", true, "region")
    options
  }

  private def run(args: Array[String]) {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val region = commandLine.getOptionValue("region")

    val spark = SparkSession
      .builder()
      .appName(s"RetargetingCassandra.${region.toUpperCase}")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "lz4")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", region + "_host"))
      .config("spark.cassandra.connection.port", "9042")
      .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.${region.toUpperCase}Factory")
      .config("spark.cassandra.connection.connections_per_executor_max", "16")
      .config("spark.cassandra.output.concurrent.writes", "2048")
      .config("spark.cassandra.concurrent.reads", "2048")
      .config("spark.cassandra.output.batch.grouping.buffer.size", "2048")
      .config("spark.cassandra.connection.keep_alive_ms", "600000")
      .enableHiveSupport()
      .getOrCreate()

    try {

      /*
      val sdf = new SimpleDateFormat("yyyyMMdd")

      val update_date = DateUtil.format(sdf.parse(date), "yyyy-MM-dd")
      var sql = Constant.user_feature_sql.replace("@date", date).replace("@update_date", update_date)

      if (region.toUpperCase.equals("CN")) {
        sql = sql + " AND UPPER(country) = 'CN'"
      }

      val rdd = spark.sql(sql)

      val keyspace = "dmp_realtime_service"
      val table = "dmp_user_features"
      val columns = SomeColumns("device_id", "age", "gender", "interest", "install_apps", "frequency" overwrite)
      rdd.rdd.saveToCassandra(keyspace, table, columns)
      */

      val mongoDB = spark.read.format("com.mongodb.spark.sql")
        .option("uri", "mongodb://internal-beijing-adServerMongo-virginia-1390640500.us-east-1.elb.amazonaws.com/new_adn.campaign_device_id")
        .option("collection", "campaign_device_id")
        .load()
        .rdd.filter(r => {
        val deviceIdUrl = r.getAs("deviceIdUrl").toString
        !deviceIdUrl.endsWith(".txt")
      })

      val map = mongoDB.map(r => {
        val deviceIdUrl = r.getAs("deviceIdUrl").toString
        val campaignId = Integer.parseInt(r.getAs("campaignId").toString)
        val url = new URL(deviceIdUrl)
        val s3_path = String.format("s3:/%s", new URL(url.openConnection.getHeaderField("location").split("\\?")(0)).getPath)
        //  val status = r.getAs("status")
        (campaignId, (s3_path, 1))
      }).collectAsMap()

      val sc = spark.sparkContext
      var rdd = sc.emptyRDD[(String, Int)]
      map.foreach(m => {
        rdd = if (m._2._2 == 1) {
          rdd.union(sc.textFile(m._2._1).map(r => {
            (r, m._1)
          }))
        } else {
          rdd.union(sc.textFile(m._2._1).map(r => {
            (r, 0)
          }))
        }

        /*
        rdd = if (m._2._2 == 1) {
          rdd.union(sc.textFile(m._2._1).map(r => {
            val array = new ArrayBuffer[(String, Int)]()
            if (StringUtils.isNotBlank(r) && (r.length == 32 || r.length == 31 || r.length == 30)) {
              array += ((r, m._1))
            } else {
              array += ((MD5Util.getMD5Str(r.toLowerCase()), m._1))
              array += ((MD5Util.getMD5Str(r.toUpperCase()), m._1))
            }
            array
          }).flatMap(l => l))
        } else {
          rdd.union(sc.textFile(m._2._1).map(r => {
            val array = new ArrayBuffer[(String, Int)]()
            if (StringUtils.isNotBlank(r) && (r.length == 32 || r.length == 31 || r.length == 30)) {
              array += ((r, 0))
            } else {
              array += ((MD5Util.getMD5Str(r.toLowerCase()), 0))
              array += ((MD5Util.getMD5Str(r.toUpperCase()), 0))
            }
            array
          }).flatMap(l => l))
        }
        */
      })

      //  import spark.implicits._
      val df = rdd.combineByKey(
        (v: Int) => Iterable(v),
        (c: Iterable[Int], v: Int) => c ++ Seq(v),
        (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2
      ).map(r => {
        val array = new ArrayBuffer[DeviceTarget]()
        if (r._1.length == 32 || r._1.length == 31 || r._1.length == 30) {
          array += DeviceTarget(r._1, r._2.toSet[Int])
        } else {
          array += DeviceTarget(MD5Util.getMD5Str(r._1.toLowerCase()), r._2.toSet[Int])
          array += DeviceTarget(MD5Util.getMD5Str(r._1.toUpperCase()), r._2.toSet[Int])
        }
        array
        /*
        Row(r._1, r._2.toSet[Int])
        */
      }).flatMap(l => l)

      df.saveToCassandra("dmp_realtime_service", "dmp_user_retarget", SomeColumns("device_id", "target_campaign_list" overwrite))
      /*
      val df = rdd.combineByKey(
        (v: Int) => Iterable(v),
        (c: Iterable[Int], v: Int) => c ++ Seq(v),
        (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2
      ).map(r => {
        DeviceTarget(r._1.toLowerCase, r._2.toSet[Int])
      }).toDF
      df.createOrReplaceTempView("device_campaign")
      val sql = "SELECT * FROM device_campaign"
      spark.sql(sql).rdd.saveToCassandra("dmp_realtime_service", "dmp_user_retarget", SomeColumns("device_id", "target_campaign_list" overwrite))
      */
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
  }
}

object RetargetingCassandra {
  def main(args: Array[String]): Unit = {
    new RetargetingCassandra().run(args)
  }
}