package mobvista.dmp.datasource.dm import com.alibaba.fastjson.{JSONArray, JSONObject} import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions.{concat, concat_ws, lit} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import java.net.URI import java.util import scala.collection.mutable.ArrayBuffer import mobvista.dmp.datasource.rtdmp.RtdmpNormal.doGetJson /** * @author jiangfan * @date 2021/3/29 17:25 */ class RtdmpNormal extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output01", true, "[must] output01") options.addOption("output02", true, "[must] output02") options.addOption("dt_today", true, "[must] dt_today") options.addOption("rtdmp_normal_count_result", true, "[must] rtdmp_normal_count_result") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output01 = commandLine.getOptionValue("output01") val output02 = commandLine.getOptionValue("output02") val dt_today = commandLine.getOptionValue("dt_today") val rtdmp_normal_count_result = commandLine.getOptionValue("rtdmp_normal_count_result") val spark = SparkSession.builder() .appName("RtdmpNormal") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(rtdmp_normal_count_result), true) try { // curl -H 'key: installapp' -H 'token: 52c1f9c84a4dbe6d6e08e668f134f23f' 'http://portal-rtdmp.mintegral.com/rtdmp/audience/query?page=1&is_install_app=1' val deviceTypeMap: util.Map[String, String] = new util.HashMap[String, String] deviceTypeMap.put("1", "imei") deviceTypeMap.put("2", "idfa") deviceTypeMap.put("3", "gaid") deviceTypeMap.put("4", "oaid") deviceTypeMap.put("5", "android_id") deviceTypeMap.put("6", "imeimd5") deviceTypeMap.put("7", "idfamd5") deviceTypeMap.put("8", "gaidmd5") deviceTypeMap.put("9", "oaidmd5") deviceTypeMap.put("10", "android_idmd5") val platformMap: util.Map[String, String] = new util.HashMap[String, String] platformMap.put("1", "android") platformMap.put("2", "ios") val url = "http://portal-rtdmp.mintegral.com/rtdmp/audience/query?page=1&is_install_app=1" val jsonObject: JSONObject = doGetJson(url) println(jsonObject) val data: JSONArray = jsonObject.getJSONArray("data") val result = new ArrayBuffer[(String, String, String, String, String)]() for (i <- 0 until data.size) { val row: JSONObject = data.getJSONObject(i) println(row) val s3_path: String = row.getString("s3_path") val match_device_type: String = row.getString("match_device_type") val device_type: String = deviceTypeMap.get(match_device_type) var platform: String = row.getString("platform") val package_name = row.getString("package_name") val country_code = row.getString("country_code") platform = platformMap.get(platform) println(s3_path) println(device_type) println(platform) if (!s3_path.equals("") && !device_type.equals("") && !platform.equals("") && !package_name.equals("") && !device_type.contains("oaid")) result += Tuple5(s3_path, device_type, platform, package_name, country_code) } println(result) val array: Array[(String, String, String, String, String)] = result.toArray var inputDataRdd: RDD[DmpDailyDataInformation] = spark.sparkContext.emptyRDD[DmpDailyDataInformation] import spark.implicits._ for (index <- 0 until array.length) { println(index) val inputPath: String = array(index)._1 val device_type: String = array(index)._2 var device_type_md5 = device_type var device_type_not_md5 = device_type if (device_type.contains("md5")) { device_type_not_md5 = device_type_not_md5.replace("md5", "") } else { device_type_md5 = device_type_md5 + "md5" } val platform: String = array(index)._3 val package_name: String = array(index)._4 val country_code: String = array(index)._5 println(inputPath) val pathUri = new URI(inputPath) //过滤后面这种不存在的s3路径 s3://mob-emr-test/dataplatform/rtdmp_request/2021/07/10/dsp_req/com.taobao.idlefish_bes/*/, if (FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration) .exists(new Path(pathUri.toString.replace("*", "")))){ inputDataRdd = inputDataRdd.union(spark.sparkContext.textFile(inputPath).map(row => { if (row.length == 32) { DmpDailyDataInformation(row, device_type_md5, platform, package_name, country_code) } else { DmpDailyDataInformation(row, device_type_not_md5, platform, package_name, country_code) } } )) }else{ println(inputPath+" not existed!") inputDataRdd = inputDataRdd.union(spark.sparkContext.emptyRDD[DmpDailyDataInformation]) } } val df: DataFrame = inputDataRdd.toDF().persist(StorageLevel.MEMORY_AND_DISK_SER) df.createOrReplaceTempView("tmp_normal_rtdmp_data") val sql1 = "select count(*) from tmp_normal_rtdmp_data" spark.sql(sql1).rdd.map(_.mkString("==")).saveAsTextFile(rtdmp_normal_count_result) df.select(concat_ws("\t", df.col("device_id"), df.col("device_type"), df.col("platform"), concat(lit("[\""), df.col("package_name"), lit("\"]")))).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output01) df.select(concat_ws("\t", df.col("device_id"), df.col("device_type"), df.col("platform"), df.col("country"))).coalesce(coalesce.toInt).write.format("text").mode("overwrite").save(output02) } finally { spark.stop() } 0 } } object RtdmpNormal { def main(args: Array[String]): Unit = { new RtdmpNormal().run(args) } } case class DmpDailyDataInformation(device_id: String, device_type: String, platform: String, package_name: String, country: String)