package mobvista.dmp.datasource.joypac import java.net.URI import java.util import mobvista.dmp.common.CommonSparkJob import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import scala.collection.JavaConversions._ /** * @package: mobvista.dmp.datasource.joypac * @author: wangjf * @date: 2019/3/4 * @time: 下午1:44 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class JoypacDaily extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("input", true, "[must] input") options.addOption("file", true, "[must] file") options.addOption("output", true, "[must] output") options.addOption("coalesce", true, "[must] coalesce") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else { printOptions(commandLine) } val input = commandLine.getOptionValue("input") val file = commandLine.getOptionValue("file") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val spark = SparkSession.builder() .appName("JoypacDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) try { val packageMap = sc.textFile(file).map(_.split(";")).map(r => if (r(0).endsWith("://")) { (r(0).substring(0, r(0).length - 3), r(1)) } else { (r(0), r(1)) } ).collectAsMap() val bPackageMap = sc.broadcast(packageMap) val rdd = spark.read.format("orc").load(input).rdd .filter(r => { bPackageMap.value.contains(r.getAs("package_name").toString) }) .map(r => { val device_id = r.getAs("idfa").toString val device_type = "idfa" val platform_id = r.getAs("platform").toString val platform = if (platform_id.equals("1")) { "android" } else if (platform_id.equals("2")) { "ios" } else { "other" } val set = new util.HashSet[String]() val jsonArray = GsonUtil.String2JsonArray(r.getAs("apps_info").toString) jsonArray.foreach(element => { val ir = element.getAsJsonObject.entrySet().iterator() while (ir.hasNext) { val itr = ir.next() val package_name = itr.getKey val code = itr.getValue.getAsString if (code.equals("1")) { if (bPackageMap.value.contains(package_name)) { set.add(bPackageMap.value(package_name)) } } } }) val bundle_id = bPackageMap.value(r.getAs("package_name").toString) set.add(bundle_id) var package_name = "" set.iterator().foreach(r => { package_name += r + ";" }) val country = "CN" Joypac(device_id, device_type, platform, package_name.substring(0, package_name.length - 1), bundle_id, country) }) import spark.implicits._ rdd.toDF().coalesce(coalesce.toInt).write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { sc.stop() spark.stop() } 0 } } object JoypacDaily { def main(args: Array[String]): Unit = { new JoypacDaily().run(args) } }