package mobvista.dmp.datasource.dm import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.{concat_ws, lit} import org.apache.spark.storage.StorageLevel import java.net.URI /** * @author jiangfan * @date 2021/10/20 17:45 */ class ComReyunPracticaltool extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output01", true, "[must] output01") options.addOption("output02", true, "[must] output02") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output01 = commandLine.getOptionValue("output01") val output02 = commandLine.getOptionValue("output02") val spark = SparkSession.builder() .appName("ComReyunPracticaltool") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) try { val sql01 = s""" |select |device_id,id_type as device_type, | case when tag_code =='0414' then concat("[\\"","com.reyun_practicaltool","\\"]") | when tag_code = '041404' then concat("[\\"","com.reyun_clean","\\"]") | when tag_code = '040507' then concat("[\\"","com.reyun_smalltool","\\"]") | when tag_code = '041403' then concat("[\\"","com.reyun_wifi","\\"]") | when tag_code = '041406' then concat("[\\"","com.reyun_security","\\"]") | end as package_name |from dwh.device_tag_weight_event_all_weekly |where dt='20210922' |and id_type in ('oaid','imei') |and tag_code in ('0414','041404','040507','041403','041406') |group by device_id,id_type,tag_code """.stripMargin println("imei_sql==" + sql01) val df: DataFrame = spark.sql(sql01).persist(StorageLevel.MEMORY_AND_DISK_SER) val df_with_package_name = df.select(concat_ws("\t", df.col("device_id"), df.col("device_type"), lit("android"), df.col("package_name"))) val df_with_country = df.select(concat_ws("\t", df.col("device_id"), df.col("device_type"), lit("android"), lit("CN"))) df_with_package_name.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) df_with_country.coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) } finally { spark.stop() } 0 } } object ComReyunPracticaltool { def main(args: Array[String]): Unit = { new ComReyunPracticaltool().run(args) } }