package mobvista.dmp.datasource.bigmedia_domestic import java.net.URI import com.google.gson.JsonObject import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.datasource.age_gender.Constant import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.Options import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel /** * author andy.liu on 2019/11/4 */ class BigMediaDomestic extends CommonSparkJob { /** * * @return */ override protected def buildOptions(): Options = { val options = new Options options.addOption("bigmediainput", true, "[must] bigmediainput") options.addOption("outputdaily", true, "[must] outputdaily") options.addOption("outputgender", true, "[must] outputgender") options.addOption("coalesce", true, "[must] coalesce") options.addOption("last_sunday", true, "[must] last_sunday") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val bigmediainput = commandLine.getOptionValue("bigmediainput") val outputdaily = commandLine.getOptionValue("outputdaily") val outputgender = commandLine.getOptionValue("outputgender") val coalesce = commandLine.getOptionValue("coalesce") val last_sunday = commandLine.getOptionValue("last_sunday") val spark = SparkSession.builder() .appName("BigMediaDomestic") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputdaily), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputgender), true) try { val inputsRDD = spark.sparkContext.textFile(bigmediainput).map(line => { val jsonObjLine = GsonUtil.String2JsonObject(line) def genValbyName(name: String, jo: JsonObject): String = { if (!jo.get(name).isJsonNull) { jo.get(name).getAsString } else { "" } } var device_id = "" var network = "" var uuid = "" var event_name = "" var event_value = "" var timestamp_date = "" var package_name = "" var gender: String = "NONE" var age_min = "" var age_max = "" var device_type: String = "idfa" var platform = "ios" try { device_id = genValbyName("device_id", jsonObjLine) network = genValbyName("network", jsonObjLine) uuid = genValbyName("uuid", jsonObjLine) event_name = genValbyName("event_name", jsonObjLine) event_value = genValbyName("event_value", jsonObjLine) timestamp_date = genValbyName("timestamp_date", jsonObjLine) package_name = genValbyName("package_name", jsonObjLine) val genders = genValbyName("genders", jsonObjLine) if (StringUtils.isNotBlank(genders) && genders.equalsIgnoreCase("GENDER_MALE")) { gender = "m" } else if (StringUtils.isNotBlank(genders) && genders.equalsIgnoreCase("GENDER_FEMALE")) { gender = "f" } age_min = genValbyName("age_min", jsonObjLine) age_max = genValbyName("age_max", jsonObjLine) val osArr = jsonObjLine.get("os").getAsJsonArray if (!osArr.isJsonNull && osArr.isJsonArray && osArr.size() > 0) { platform = osArr.get(0).getAsString.toLowerCase() } if (platform.equalsIgnoreCase("ios")) { device_type = "idfa" } else if (platform.equalsIgnoreCase("android")) { device_type = "imei" } } catch { case e: Exception => { e.printStackTrace() } } Row(device_id, device_type, platform, network, uuid, event_name, event_value, timestamp_date, package_name, gender, age_min, age_max) }) spark.createDataFrame(inputsRDD, Constant.schema_bigmedia_domestic).createOrReplaceTempView("ods_bigmedia_domestic_tmp") val sql = s""" select /*+ mapjoin(t1)*/ t2.device_id device_id, |device_type, |platform, |max(network) network, |max(uuid) uuid, |max(event_name) event_name, |max(event_value) event_value, |max(timestamp_date) timestamp_date, |max(package_name) package_name, |max(genders) genders, |min(age_min) age_min, |max(age_max) age_max, |'A' tag, |max(genders) label, |'bm' business |from ods_bigmedia_domestic_tmp t1 join ( select device_id,device_id_md5 from dwh.device_id_md5_match where dt='$last_sunday' ) t2 |on (t1.device_id = t2.device_id_md5) |group by |t2.device_id, |device_type, |platform """.stripMargin val df = spark.sql(sql).coalesce(coalesce.toInt).persist(StorageLevel.MEMORY_AND_DISK_SER) df.select( col("device_id"), col("device_type"), col("platform"), col("network"), col("uuid"), col("event_name"), col("event_value"), col("timestamp_date"), col("package_name"), col("genders"), col("age_min"), col("age_max") ).write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(outputdaily) /* val sql2= s""" select /*+ mapjoin(t1)*/ t2.device_id, | 'A' tag, |max(genders) label, |'bm' business, |device_type |from ods_bigmedia_domestic_tmp t1 join ( select device_id,device_id_md5 from dwh.device_id_md5_match where dt='${last_sunday}' ) t2 |on (t1.device_id = t2.device_id_md5) |group by |t2.device_id, |device_type, |platform """.stripMargin */ df.select( col("device_id"), col("device_type"), col("tag"), col("label"), col("business") ).write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(outputgender) } finally { spark.stop() } 0 } } object BigMediaDomestic { def main(args: Array[String]): Unit = { new BigMediaDomestic().run(args) } }