package mobvista.dmp.datasource.age_gender import java.net.URI import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.MRUtils import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{Row, SaveMode, SparkSession} /** * @package: mobvista.dmp.datasource.age * @author: wangjf * @create: 2018-09-10 16:46 * */ class MergeInstallGender extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else { printOptions(commandLine) } val dsp_gender_path = commandLine.getOptionValue("dsp_gender_path") val ga_gender_path = commandLine.getOptionValue("ga_gender_path") val fb_gender_path = commandLine.getOptionValue("fb_gender_path") val tp_gender_path = commandLine.getOptionValue("tp_gender_path") val gender_output = commandLine.getOptionValue("gender_output") val date = commandLine.getOptionValue("date") // val ga_date = commandLine.getOptionValue("ga_date") val parallelism = commandLine.getOptionValue("parallelism") val spark = SparkSession.builder() .appName("MergeInstallGender") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(gender_output), true) try { val dsp_gender = spark.read.schema(Constant.schema_age_gender).orc(dsp_gender_path) val ga_gender = spark.read.schema(Constant.schema_age_gender).orc(ga_gender_path) val fb_gender = spark.read.schema(Constant.schema_age_gender).orc(fb_gender_path) val tp_gender = spark.read.schema(Constant.schema_age_gender).orc(tp_gender_path) dsp_gender.union(ga_gender).union(fb_gender).union(tp_gender).createOrReplaceTempView("t_gender") spark.udf.register("pkg_keys", Logic.pkg_keys _) // spark.udf.register("split_keys", Logic.split_keys _) // val update_date = DateUtil.format(DateUtil.getDay(date, "yyyyMMdd", -91), "yyyy-MM-dd") val sql = Constant.dmp_install_list_sql_14days.replace("@date", date) // .replace("@ga_date", ga_date) // .replace("@update_date", update_date) spark.sql(sql).createOrReplaceTempView("t_install") spark.sql(Constant.dmp_install_list_join_gender_sql).repartition(parallelism.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(gender_output) /* val install = spark.sql(sql) .rdd .mapPartitions(buildInstall) .combineByKey( (v: String) => Iterable(v), (c: Iterable[String], v: String) => c ++ Seq(v), (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2 ).mapPartitions(Logic.mergeInstallPart) val age_df = install.union(dsp_gender).union(ga_gender).union(fb_gender).union(tp_gender) .combineByKey( (v: String) => Iterable(v), (c: Iterable[String], v: String) => c ++ Seq(v), (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2 ) mapPartitions (Logic.mergeGenderPart) spark.createDataFrame(age_df.coalesce(numPartitions = parallelism.toInt, shuffle = true), Constant.merge_schema) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(gender_output) */ } finally { sc.stop() spark.stop() } 0 } def buildAgeGender(rows: Iterator[Row]): Iterator[(String, String)] = { rows.map(row => { (MRUtils.JOINER.join(row.getAs("device_id"), row.getAs("device_type")), MRUtils.JOINER.join(row.getAs("tag"), row.getAs("label"), row.getAs("business"))) }) } def buildInstall(rows: Iterator[Row]): Iterator[(String, String)] = { rows.map(row => { (MRUtils.JOINER.join(row.getAs("device_id"), row.getAs("device_type")), MRUtils.JOINER.join(row.getAs("package_names"), row.getAs("new_date"))) }) } override protected def buildOptions(): Options = { val options = new Options options.addOption("dsp_gender_path", true, "[must] dsp_gender_path") options.addOption("ga_gender_path", true, "[must] ga_gender_path") options.addOption("fb_gender_path", true, "[must] fb_gender_path") options.addOption("tp_gender_path", true, "[must] tp_gender_path") options.addOption("gender_output", true, "[must] gender_output") options.addOption("date", true, "[must] date") // options.addOption("ga_date", true, "[must] ga_date") options.addOption("parallelism", true, "[must] parallelism") options } } object MergeInstallGender { def main(args: Array[String]): Unit = { new MergeInstallGender().run(args) } }