package mobvista.dmp.datasource.age_gender import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel class ThirdPartySourceTotal extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("outputtotal", true, "[must] outputtotal") options.addOption("outputgender",true, "[must] outputgender") options.addOption("coalesce", true, "[must] coalesce") options.addOption("yesterday", true, "[must] yesterday") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val outputtotal = commandLine.getOptionValue("outputtotal") val outputgender = commandLine.getOptionValue("outputgender") val coalesce = commandLine.getOptionValue("coalesce") val yesterday = commandLine.getOptionValue("yesterday") val spark = SparkSession.builder() .appName("ThirdPartySourceTotal") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputgender), true) try { // spark.udf.register("check_deviceId", Logic.check_deviceId _) // spark.udf.register("check_gender", Logic.check_gender _) val sql1 = s""" SELECT device_id,device_type,min(platform) platform,min(case when gender = 'male' then 'm' when gender = 'female' then 'f' else gender end ) as gender |from dwh.etl_gender_thirdparty_data_total where dt ='${yesterday}' |group by device_id,device_type """.stripMargin val etl_gender_tp_data = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK) etl_gender_tp_data .coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(outputtotal) val gender_rdd= etl_gender_tp_data .rdd .map(line => { val device_id = line.getAs[String]("device_id") val device_type = line.getAs[String]("device_type") val gender = line.getAs[String]("gender") Row(device_id, "A", gender, "tp", device_type) }) spark.createDataFrame(gender_rdd, Constant.schema_age_gender) .write.mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(outputgender) /* spark.sql(sql1).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(outputtotal) val sql_gender = Constant.tp_sql .replace("@date", yesterday) .replace("@check_deviceId", "check_deviceId(device_id)") // .replace("@check_gender", "check_gender(gender)") val gender_rdd = spark.sql(sql_gender).rdd.filter(line => { var gender = line.getAs[String]("gender") StringUtils.isNotBlank(gender) && (gender.equalsIgnoreCase("m") || gender.equalsIgnoreCase("f")) } ).map(line => { val device_id = line.getAs[String]("device_id") val device_type = line.getAs[String]("device_type") val gender = line.getAs[String]("gender") Row(device_id, "A", gender, "tp", device_type) }) spark.createDataFrame(gender_rdd, Constant.schema_age_gender) .write.mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(outputgender)*/ } finally { spark.stop() } 0 } } object ThirdPartySourceTotal { def main(args: Array[String]): Unit = { new ThirdPartySourceTotal().run(args) } }