package mobvista.dmp.datasource.retargeting import java.net.URL import com.datastax.spark.connector._ import mobvista.dmp.util.{MD5Util, PropertyUtil} import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.{Row, SparkSession} import scala.collection.mutable.ArrayBuffer class RetargetingCassandra extends Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("region", true, "region") options } private def run(args: Array[String]) { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val region = commandLine.getOptionValue("region") val spark = SparkSession .builder() .appName(s"RetargetingCassandra.${region.toUpperCase}") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.cassandra.connection.host", PropertyUtil.getProperty("ip.properties", region + "_host")) .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.connection.factory", s"mobvista.dmp.utils.cassandra.${region.toUpperCase}Factory") .config("spark.cassandra.connection.connections_per_executor_max", "16") .config("spark.cassandra.output.concurrent.writes", "2048") .config("spark.cassandra.concurrent.reads", "2048") .config("spark.cassandra.output.batch.grouping.buffer.size", "2048") .config("spark.cassandra.connection.keep_alive_ms", "600000") .enableHiveSupport() .getOrCreate() try { /* val sdf = new SimpleDateFormat("yyyyMMdd") val update_date = DateUtil.format(sdf.parse(date), "yyyy-MM-dd") var sql = Constant.user_feature_sql.replace("@date", date).replace("@update_date", update_date) if (region.toUpperCase.equals("CN")) { sql = sql + " AND UPPER(country) = 'CN'" } val rdd = spark.sql(sql) val keyspace = "dmp_realtime_service" val table = "dmp_user_features" val columns = SomeColumns("device_id", "age", "gender", "interest", "install_apps", "frequency" overwrite) rdd.rdd.saveToCassandra(keyspace, table, columns) */ val mongoDB = spark.read.format("com.mongodb.spark.sql") .option("uri", "mongodb://internal-beijing-adServerMongo-virginia-1390640500.us-east-1.elb.amazonaws.com/new_adn.campaign_device_id") .option("collection", "campaign_device_id") .load() .rdd.filter(r => { val deviceIdUrl = r.getAs("deviceIdUrl").toString !deviceIdUrl.endsWith(".txt") }) val map = mongoDB.map(r => { val deviceIdUrl = r.getAs("deviceIdUrl").toString val campaignId = Integer.parseInt(r.getAs("campaignId").toString) val url = new URL(deviceIdUrl) val s3_path = String.format("s3:/%s", new URL(url.openConnection.getHeaderField("location").split("\\?")(0)).getPath) // val status = r.getAs("status") (campaignId, (s3_path, 1)) }).collectAsMap() val sc = spark.sparkContext var rdd = sc.emptyRDD[(String, Int)] map.foreach(m => { rdd = if (m._2._2 == 1) { rdd.union(sc.textFile(m._2._1).map(r => { (r, m._1) })) } else { rdd.union(sc.textFile(m._2._1).map(r => { (r, 0) })) } /* rdd = if (m._2._2 == 1) { rdd.union(sc.textFile(m._2._1).map(r => { val array = new ArrayBuffer[(String, Int)]() if (StringUtils.isNotBlank(r) && (r.length == 32 || r.length == 31 || r.length == 30)) { array += ((r, m._1)) } else { array += ((MD5Util.getMD5Str(r.toLowerCase()), m._1)) array += ((MD5Util.getMD5Str(r.toUpperCase()), m._1)) } array }).flatMap(l => l)) } else { rdd.union(sc.textFile(m._2._1).map(r => { val array = new ArrayBuffer[(String, Int)]() if (StringUtils.isNotBlank(r) && (r.length == 32 || r.length == 31 || r.length == 30)) { array += ((r, 0)) } else { array += ((MD5Util.getMD5Str(r.toLowerCase()), 0)) array += ((MD5Util.getMD5Str(r.toUpperCase()), 0)) } array }).flatMap(l => l)) } */ }) // import spark.implicits._ val df = rdd.combineByKey( (v: Int) => Iterable(v), (c: Iterable[Int], v: Int) => c ++ Seq(v), (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2 ).map(r => { val array = new ArrayBuffer[DeviceTarget]() if (r._1.length == 32 || r._1.length == 31 || r._1.length == 30) { array += DeviceTarget(r._1, r._2.toSet[Int]) } else { array += DeviceTarget(MD5Util.getMD5Str(r._1.toLowerCase()), r._2.toSet[Int]) array += DeviceTarget(MD5Util.getMD5Str(r._1.toUpperCase()), r._2.toSet[Int]) } array /* Row(r._1, r._2.toSet[Int]) */ }).flatMap(l => l) df.saveToCassandra("dmp_realtime_service", "dmp_user_retarget", SomeColumns("device_id", "target_campaign_list" overwrite)) /* val df = rdd.combineByKey( (v: Int) => Iterable(v), (c: Iterable[Int], v: Int) => c ++ Seq(v), (c1: Iterable[Int], c2: Iterable[Int]) => c1 ++ c2 ).map(r => { DeviceTarget(r._1.toLowerCase, r._2.toSet[Int]) }).toDF df.createOrReplaceTempView("device_campaign") val sql = "SELECT * FROM device_campaign" spark.sql(sql).rdd.saveToCassandra("dmp_realtime_service", "dmp_user_retarget", SomeColumns("device_id", "target_campaign_list" overwrite)) */ } finally { if (spark != null) { spark.stop() } } } } object RetargetingCassandra { def main(args: Array[String]): Unit = { new RetargetingCassandra().run(args) } }