package mobvista.dmp.datasource.datatory import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} /** * @package: mobvista.dmp.datasource.datatory * @author: wangjf * @date: 2019/4/3 * @time: 下午2:03 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class DatatoryJob extends CommonSparkJob with java.io.Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("coalesce", true, "coalesce") options.addOption("json", true, "json") options.addOption("tag", true, "tag") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val coalesce = commandLine.getOptionValue("coalesce") val json = commandLine.getOptionValue("json") val tag = commandLine.getOptionValue("tag") val spark = SparkSession .builder() .appName("DatatoryJob") .config("spark.rdd.compress", "true") .config("spark.shuffle.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "lz4") .config("spark.io.compression.lz4.blockSize", "64k") .config("spark.sql.autoBroadcastJoinThreshold", "209715200") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext try { val code_sql = Constant.ods2new_sql val bMap = sc.broadcast(spark.sql(code_sql).rdd.map(r => { (r.getAs("tag_id").toString, (r.getAs("new_first_id").toString, r.getAs("new_second_id").toString)) }).collectAsMap()) val sql = """ |SHOW PARTITIONS dwh.ods_dmp_user_info_all """.stripMargin val partDF = spark.sql(sql) val date = partDF.orderBy(partDF("partition").desc).take(1)(0).getString(0).split("=")(1) /* spark.udf.register("filterByPackage", Constant.filterByPackage _) spark.udf.register("filterByCountry", Constant.filterByCountry _) val packageFilterEntity: PackageFilterEntity = Constant.parseJsonString(json) val jobId = packageFilterEntity.jobId val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") val updateDate = sdf1.format(sdf2.parse(packageFilterEntity.start)) var sql = Constant.filter_sql.replace("@date", packageFilterEntity.end).replace("@update_date", updateDate) if (StringUtils.isNotBlank(packageFilterEntity.countryCode)) { sql = sql + s" AND filterByCountry(country,'${packageFilterEntity.countryCode}')" } if (StringUtils.isNotBlank(packageFilterEntity.packageList)) { sql = sql + s" AND filterByPackage(install,'${packageFilterEntity.packageList}')" } val baseDF = spark.sql(sql).rdd .map(r => { PackageUserInfoEntity(r.getAs("dev_id"), r.getAs("install"), r.getAs("interest"), r.getAs("country"), r.getAs("age"), r.getAs("gender")) }).persist(StorageLevel.MEMORY_AND_DISK_SER) */ val base = Constant.processQuery(date, tag, json.replaceAll("&@", " "), spark) /* val top: Integer = if (packageFilterEntity.top == null) { 20 } else { packageFilterEntity.top } val all = baseDF.count() val allDF = sc.parallelize(Seq(Result(jobId, "all", "all", all.toInt))) val packageDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "package")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .sortBy(_._2, false) .map(r => { Result(jobId, "package", r._1, r._2) }) val packageRDD = sc.parallelize(packageDF.take(top)) val interestDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "interest")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .sortByKey() .map(r => { Result(jobId, "interest", r._1, r._2) }) val countryDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "country")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .sortBy(_._2, false) .map(r => { Result(jobId, "country", r._1, r._2) }) val countryRDD = sc.parallelize(countryDF.take(top)) val ageDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "age")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .sortByKey() .map(r => { Result(jobId, "age", r._1, r._2) }) val genderDF = baseDF.mapPartitions(Constant.commonPartition(_, bMap, "gender")) .reduceByKey(_ + _) .repartition(coalesce.toInt) .sortByKey() .map(r => { Result(jobId, "gender", r._1, r._2) }) import spark.implicits._ val df = allDF.union(packageRDD).union(interestDF).union(countryRDD).union(ageDF).union(genderDF) .toDF .repartition(1) */ val df: Dataset[Row] = Constant.processBase(base._1, base._2, base._3, base._4, spark, bMap) /* val prop = new java.util.Properties prop.setProperty("driver", "com.mysql.jdbc.Driver") prop.setProperty("user", "apptag_rw") prop.setProperty("password", "7gyLEVtkER3u8c9") prop.setProperty("characterEncoding", "utf8") df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/datatory", "result", prop) */ Constant.writeMySQL(df, "result", SaveMode.Append) } finally { if (spark != null) { spark.stop() } } 0 } } object DatatoryJob { def main(args: Array[String]): Unit = { new DatatoryJob().run(args) } }