package mobvista.dmp.datasource.dm import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SaveMode import org.apache.spark.sql.types.{StringType, StructField, StructType} import java.net.URI import scala.collection.mutable /** * @package: mobvista.dmp.datasource.dm * @author: wangjf * @date: 2020-12-14 17:50:36 * @time: 下午6:42 * @email: jinfeng.wang@mobvista.com */ class FixInstallListIdfv extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) printOptions(commandLine) return 1 } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val business = commandLine.getOptionValue("business") val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val spark = MobvistaConstant.createSparkSession(s"FixInstallListIdfv.$date.${business}") val sc = spark.sparkContext val schema = StructType(Array( StructField("device_id", StringType), StructField("ruid", StringType) )) try { spark.udf.register("getInstallList", getInstallList _) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) /* val mappingRdd = sc.textFile(input).map(_.split("\t")).filter(_.length == 2) .map(r => { Row(r(0), r(1)) }) val df = spark.createDataFrame(mappingRdd, schema).dropDuplicates */ val df = spark.read.orc(input) .dropDuplicates() df.createOrReplaceTempView("mapping") /* // old dmp var sql = s""" |SELECT mp.new_id device_id, 'idfv' device_type, 'ios' platform, getInstallList(COLLECT_SET(CONCAT(device_type, '\001', install_list))) install_list | FROM dwh.dm_install_list dmp INNER JOIN mapping mp | ON UPPER(dmp.device_id) = UPPER(mp.old_id) | WHERE CONCAT(year, month, day) = '$date' AND business = '$business' AND platform = 'ios' | GROUP BY mp.new_id |""".stripMargin */ // new dmp var sql = s""" |SELECT mp.new_id device_id, 'idfv' device_type, 'ios' platform, country, install_list, ext_data, update_date | FROM dwh.dmp_install_list dmp INNER JOIN mapping mp | ON UPPER(dmp.device_id) = UPPER(mp.old_id) | WHERE dt = '$date' AND business = '$business' AND platform = 'ios' |""".stripMargin val ruid = spark.sql(sql) sql = s""" |SELECT device_id, device_type, platform, country, install_list, ext_data, update_date | FROM dwh.dmp_install_list | WHERE dt = '$date' AND business = '$business' |""".stripMargin val other = spark.sql(sql) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) ruid.union(other) .repartition(coalesce) .write .option("orc.compress", "zlib") .mode(SaveMode.Overwrite) .orc(output) /* val rdd = ruid.fullOuterJoin(other).map(tuple => { val deviceId = tuple._1 val valTuple = tuple._2 val ruidOpt = valTuple._1 val otherOpt = valTuple._2 if (ruidOpt.isEmpty && otherOpt.isDefined) { MRUtils.JOINER.join(deviceId, otherOpt.get) } else { MRUtils.JOINER.join(deviceId, ruidOpt.get) } }) rdd.repartition(coalesce) .saveAsTextFile(output, classOf[GzipCodec]) */ } finally { if (spark != null) { sc.stop() spark.stop() } } 0 } def getInstallList(installs: mutable.WrappedArray[String]): String = { var install_list: String = "" var flag = true val iter = installs.iterator while (iter.hasNext && flag) { val arr = iter.next().split("\001", -1) val deviceType = arr(0) val installList = arr(1) install_list = installList if (deviceType.equalsIgnoreCase("idfa")) { flag = false } } install_list } override protected def buildOptions(): Options = { val options = new Options options.addOption("date", true, "[must] date") options.addOption("business", true, "[must] business") options.addOption("input", true, "[must] input") options.addOption("output", true, "[must] output") options.addOption("coalesce", true, "[must] coalesce") options } } object FixInstallListIdfv { def main(args: Array[String]): Unit = { new FixInstallListIdfv().run(args) } }