package mobvista.dmp.datasource.taobao import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.format.RDDMultipleOutputFormat import mobvista.dmp.util.MRUtils import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} import org.apache.spark.storage.StorageLevel import java.net.URI import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("dt_today", true, "[must] dt_today") options.addOption("dt_oneday_ago", true, "[must] dt_oneday_ago") options.addOption("update", true, "[must] update") options.addOption("output", true, "[must] output") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val dt_today = commandLine.getOptionValue("dt_today") val dt_oneday_ago = commandLine.getOptionValue("dt_oneday_ago") val update = commandLine.getOptionValue("update") val output = commandLine.getOptionValue("output") val spark = MobvistaConstant.createSparkSession("UCOtherDataToDmp") mutable.WrappedArray val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) try { val conf = spark.sparkContext.hadoopConfiguration conf.set("mapreduce.output.compress", "true") conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.setBoolean("mapreduce.output.fileoutputformat.compress", true) conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec]) val sql = s""" |SELECT device_id, COLLECT_SET(package_name) install_list | FROM | ( | SELECT device_id, package_name | FROM dwh.dm_install_list_v2 | WHERE dt = '${dt_today}' AND business = 'uc_activation' AND device_type = 'imeimd5' | AND package_name IN ('com.uc.foractivation.4b5a58','com.uc.foractivation.d3f521') | UNION | SELECT device_id, package_name | FROM dwh.dm_install_list_v2 | WHERE dt = '${dt_oneday_ago}' AND business = 'dsp_req' AND device_type = 'imeimd5' | AND package_name IN ('com.UCMobile_bes','com.ucmobile_oppo') | ) t | GROUP BY device_id |""".stripMargin val df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER) val rdd = df.rdd.map(r => { val arrayBuffer = new ArrayBuffer[(Text, Text)]() val deviceId = r.getAs[String]("device_id") val deviceType = "imeimd5" val platform = "android" val installList = r.getAs[mutable.WrappedArray[String]]("install_list") if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.UCMobile_bes")) { arrayBuffer += ((new Text(s"$output/4b5a58_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucbes", update)))) } if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.UCMobile_bes")) { arrayBuffer += ((new Text(s"$output/d3f521_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucbes", update)))) } if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.ucmobile_oppo")) { arrayBuffer += ((new Text(s"$output/4b5a58_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucoppo", update)))) } if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.ucmobile_oppo")) { arrayBuffer += ((new Text(s"$output/d3f521_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucoppo", update)))) } arrayBuffer }).flatMap(l => { l }) rdd.coalesce(50) .saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf) } finally { spark.stop() } 0 } } object UCOtherDataToDmpV2 { def main(args: Array[String]): Unit = { new UCOtherDataToDmpV2().run(args) } }