package mobvista.dmp.datasource.dm import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.{concat_ws, lit} import org.apache.spark.storage.StorageLevel import java.net.URI class AgePackageNames extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output01", true, "[must] output01") options.addOption("output02", true, "[must] output02") options.addOption("dt_today", true, "[must] dt_today") options.addOption("update", true, "[must] update") options.addOption("Age_Package_Names", true, "[must] Age_Package_Names") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output01 = commandLine.getOptionValue("output01") val output02 = commandLine.getOptionValue("output02") val dt_today = commandLine.getOptionValue("dt_today") val update = commandLine.getOptionValue("update") val Age_Package_Names = commandLine.getOptionValue("Age_Package_Names") val spark = SparkSession.builder() .appName("AgePackageNames") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) try { val age_package_names_rdd = sc.textFile(Age_Package_Names) val age_package_names_df: DataFrame = age_package_names_rdd.toDF("pkg") age_package_names_df.createOrReplaceTempView("agg_package_names_tmp") val sql1= s""" |select | device_id,device_type,platform,installnum |from |(select | device_id,device_type,platform,count(1) as installnum |from | dwh.dm_install_list_v2 |where | dt = "${dt_today}" and update_date >= "${update}" |and | package_name in (select pkg from agg_package_names_tmp) |group by device_id,device_type,platform) a |where installnum>=2 """.stripMargin val df01: DataFrame = spark.sql(sql1).persist(StorageLevel.MEMORY_AND_DISK_SER) val android_2_df = df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("android")).select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),lit("[\"com.age.2\"]"))) val android_3_df = df01.filter(df01.col("installnum")>=3 && df01.col("platform").equalTo("android")).select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),lit("[\"com.age.3\"]"))) val android_df_with_country = df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("android")).select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("android"),lit("UNKNOWN"))) val ios_2_df = df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("ios"),lit("[\"id2021032202\"]"))) val ios_3_df = df01.filter(df01.col("installnum")>=3 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("ios"),lit("[\"id2021032203\"]"))) val ios_df_with_country = df01.filter(df01.col("installnum")>=2 && df01.col("platform").equalTo("ios")).select(concat_ws("\t", df01.col("device_id"), df01.col("device_type"), lit("ios"),lit("UNKNOWN"))) android_2_df.union(android_3_df).union(ios_2_df).union(ios_3_df).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) android_df_with_country.union(ios_df_with_country).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) } finally { spark.stop() } 0 } } object AgePackageNames { def main(args: Array[String]): Unit = { new AgePackageNames().run(args) } }