package mobvista.dmp.datasource.retargeting import java.net.URL import com.datastax.spark.connector._ import mobvista.dmp.util.PropertyUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.{Row, SparkSession} /** * @package: mobvista.dmp.datasource.retargeting * @author: wangjf * @date: 2019/5/23 * @time: 下午4:41 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class RetargetingJob extends Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("region", true, "region") options } private def run(args: Array[String]): Unit = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val region = commandLine.getOptionValue("region") val spark = SparkSession .builder() .appName("RetargetingJob") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", region + "_host")) .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.${region.toUpperCase}Factory") .config("spark.cassandra.connection.connections_per_executor_max", "16") .config("spark.cassandra.output.concurrent.writes", "2048") .config("spark.cassandra.concurrent.reads", "2048") .config("spark.cassandra.output.batch.grouping.buffer.size", "2048") .config("spark.cassandra.connection.keep_alive_ms", "600000") .enableHiveSupport() .getOrCreate() try { val mongoDB = spark.read.format("com.mongodb.spark.sql") .option("uri", "mongodb://internal-beijing-adServerMongo-virginia-1390640500.us-east-1.elb.amazonaws.com/new_adn.campaign_device_id") .option("collection", "campaign_device_id") .load() .rdd.filter(r => { val deviceIdUrl = r.getAs("deviceIdUrl").toString !deviceIdUrl.endsWith(".txt") }) val map = mongoDB.map(r => { val deviceIdUrl = r.getAs("deviceIdUrl").toString val campaignId = Integer.parseInt(r.getAs("campaignId").toString) val url = new URL(deviceIdUrl) val s3_path = String.format("s3:/%s", new URL(url.openConnection.getHeaderField("location").split("\\?")(0)).getPath) // val status = r.getAs("status") (campaignId, (s3_path, 1)) }).collectAsMap() val sc = spark.sparkContext var rdd = sc.emptyRDD[(String, Int)] map.foreach(m => { if (m._2._2 == 1) { rdd = rdd.union(sc.textFile(m._2._1).map(r => { (r, m._1) })) } else { rdd = rdd.union(sc.textFile(m._2._1).map(r => { (r, 0) })) } }) // import spark.implicits._ val df = rdd.combineByKey( (v: Int) => Iterable(v), (c: Iterable[Int], v: Int) => c ++ Seq(v), (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2 ).map(r => { Row(r._1.toLowerCase, r._2.toSet[Int]) }) // df.createOrReplaceTempView("device_campaign") // val sql = "SELECT * FROM device_campaign" df.saveToCassandra("dmp_realtime_service", "dmp_user_features", SomeColumns("device_id", "target_campaign_list" overwrite)) /* val map = new mutable.HashMap[Int, (String, Int)]() val jsonArray = GsonUtil.String2JsonArray(json) jsonArray.foreach(json => { val jsonObject = json.getAsJsonObject map.put(jsonObject.get("campaign_id").getAsInt, (jsonObject.get("s3_path").getAsString, jsonObject.get("status").getAsInt)) }) var rdd = sc.emptyRDD[(String, Int)] map.foreach(m => { if (m._2._2 == 1) { rdd = rdd.union(sc.textFile(m._2._1).map(r => { (r, m._1) })) } else { rdd = rdd.union(sc.textFile(m._2._1).map(r => { (r, 0) })) } }) // import spark.implicits._ val df = rdd.combineByKey( (v: Int) => Iterable(v), (c: Iterable[Int], v: Int) => c ++ Seq(v), (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2 ).map(r => { DeviceTarget(r._1.toLowerCase, r._2.toSet[Int]) }).repartition(10) df.saveToCassandra("dmp_realtime_service", "dmp_user_features", SomeColumns("device_id", "target_campaign_list" overwrite)) */ } finally { if (spark != null) { spark.stop() } } } } object RetargetingJob { def main(args: Array[String]): Unit = { new RetargetingJob().run(args) } }