package mobvista.dmp.datasource.joypac import java.net.URI import java.util.Properties import mobvista.dmp.datasource.retargeting.Constant import mobvista.dmp.util.DateUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import scala.collection.mutable class JoypacUserFeatureJob extends Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options.addOption("dict_output", true, "dict_output") options.addOption("coalesce", true, "coalesce") options } var packageIds: Set[Int] = null val user_feature_sql: String = s"""SELECT LOWER(device_id) device_id, age, gender, interest, concat_ws(',',install) install_apps, frequency | FROM dwh.dm_user_info | WHERE dt = '@date' AND update_date = '@update_date' AND ((country != '' AND country IS NOT NULL) OR (age != '' AND age IS NOT NULL) OR | (gender != '' AND gender IS NOT NULL) OR (size(interest) != 0 AND interest IS NOT NULL) OR (size(install) != 0 | AND install IS NOT NULL)) AND has(install) """.stripMargin private def run(args: Array[String]) { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val dict_output = commandLine.getOptionValue("dict_output") val coalesce = commandLine.getOptionValue("coalesce") val spark = SparkSession .builder() .appName(s"JoypacUserFeatureJob.$date") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "lz4") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .enableHiveSupport() .getOrCreate() try { val sc = spark.sparkContext /* val bMap = spark.sparkContext.broadcast(spark.sql(Constant.id_old2new_sql).rdd.map(r => { (r.getAs("tag_code").toString, r.getAs("tag_id").toString) }).collectAsMap()) */ val packageNames = jdbcConnection(spark, "dmp", "package_list").rdd.map(r => { var packageName = r.getAs("package_name").toString.toLowerCase() if (packageName.matches("^id[0-9]+$")) { packageName = packageName.replace("id", "") } packageName }).cache().collect().toSet val rdd = Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd .filter(r => { packageNames.contains(r.getAs("app_package_name").toString.toLowerCase()) }).cache() FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(dict_output), true) rdd.map(r => { (r.getAs("app_package_name").toString, Integer.parseInt(r.getAs("id").toString)) }).coalesce(1).saveAsTextFile(dict_output) packageIds = rdd.map(r => { Integer.parseInt(r.getAs("id").toString) }).collect().toSet println("packageIds.size ==>> " + packageIds.size) spark.udf.register("has", has _) val update_date = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") val sql = user_feature_sql.replace("@date", date) .replace("@update_date", update_date) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) spark.sql(sql) .repartition(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "snappy") .orc(output) } finally { if (spark != null) { spark.stop() } } def jdbcConnection(spark: SparkSession, database: String, table: String): DataFrame = { val properties = new Properties() properties.put("driver", "com.mysql.jdbc.Driver") properties.put("user", "apptag_rw") properties.put("password", "7gyLEVtkER3u8c9") properties.put("characterEncoding", "utf8") val url = s"jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/${database}" spark.read.jdbc(url = url, table = table, properties = properties) } def has(install: mutable.WrappedArray[Int]): Boolean = { var flag = false packageIds.foreach(packageId => { if (install.toSet.contains(packageId)) { flag = true } }) flag } /* def mapFun(row: Row): DeviceTag = { val interest = row.getAs("interest").asInstanceOf[mutable.WrappedArray[String]] val interest_set = new mutable.HashSet[Int]() interest.foreach(r => { if (bMap.value.keySet.contains(r) && StringUtils.isNotBlank(bMap.value(r))) { interest_set.add(bMap.value(r).toInt) } }) val frequencySet = new mutable.HashSet[struct]() val frequency = row.getAs("frequency").toString import scala.collection.JavaConversions._ val json = GsonUtil.String2JsonObject(frequency) json.entrySet().foreach(j => { if (StringUtils.isNotBlank(j.getKey) && bMap.value.keySet.contains(j.getKey)) { frequencySet.add(struct(bMap.value(j.getKey), j.getValue.getAsInt)) interest_set.add(bMap.value(j.getKey).toInt) } }) DeviceTag(row.getAs("device_id"), row.getAs("age"), row.getAs("gender"), row.getAs("install_apps"), interest_set.mkString(","), mutable.WrappedArray.make(frequencySet.toArray)) } */ } } object JoypacUserFeatureJob { def main(args: Array[String]): Unit = { new JoypacUserFeatureJob().run(args) } }