package mobvista.dmp.datasource.rtdmp import com.alibaba.fastjson.{JSONArray, JSONObject} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.format.RDDMultipleOutputFormat import mobvista.dmp.util.DateUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} import org.apache.spark.storage.StorageLevel import java.net.URI import scala.collection.JavaConverters.asScalaSetConverter import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** * @package: mobvista.dmp.datasource.rtdmp * @author: wangjf * @date: 2020/7/13 * @time: 11:25 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class RTDmpRequestDaily extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options.addOption("hh", true, "hh") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val spark = MobvistaConstant.createSparkSession(s"RTDmpRequestDaily.$date") val sc = spark.sparkContext try { val conf = spark.sparkContext.hadoopConfiguration conf.set("mapreduce.output.compress", "true") conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.setBoolean("mapreduce.output.fileoutputformat.compress", true) conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec]) spark.udf.register("changeDeviceId", Logic.changeDeviceId _) spark.udf.register("changeDeviceType", Logic.changeDeviceType _) val sql = s""" |SELECT changeDeviceId(device_type,device_id) device_id, changeDeviceType(device_type) device_type, install_list | FROM dwh.dmp_install_list WHERE dt = '${date}' AND business = '14days' |""".stripMargin // 默认计算上个小时的数据 val update_time_start = DateUtil.format(date + " 00:00:00", "yyyy-MM-dd HH:mm:ss") val update_time_end = DateUtil.format(date + " 23:59:59", "yyyy-MM-dd HH:mm:ss") val audience_date_utime_start = DateUtil.parse(date + " 00:00:00", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800 val audience_date_utime_end = DateUtil.parse(date + " 23:59:59", "yyyy-MM-dd HH:mm:ss").getTime / 1000 - 28800 val json: JSONObject = RTDmpServer.query(update_time_start, update_time_end, 0, 2, audience_date_utime_start, audience_date_utime_end, "6") FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) val map = new mutable.HashMap[Integer, Array[String]]() for (id <- json.keySet().toArray()) { val pkgSet = json.getJSONObject(String.valueOf(id)).getString("installed_package_name").split(",", -1) map.put(Integer.parseInt(String.valueOf(id)), pkgSet) } val rdd = spark.sql(sql).rdd.map(row => { val deviceId = row.getAs[String]("device_id") val deviceType = row.getAs[String]("device_type") val install_list = MobvistaConstant.String2JSONObject(row.getAs[String]("install_list")).keySet().asScala val array = new ArrayBuffer[(String, String, Integer)]() for (key <- map.keySet) { if (map(key).toSet.intersect(install_list).size > 0) { array += ((deviceId, deviceType, key)) } } array }).flatMap(l => l).persist(StorageLevel.MEMORY_AND_DISK_SER) rdd.repartition(coalesce).map(t => { (new Text(s"$output/${t._3}/${t._2}"), new Text(t._1)) }).saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf) val mRdd = rdd.map(r => { (r._3, r._2) }).distinct(10) .cache() val pkgTypeCount = mRdd.countByKey() val portalMap = mRdd.collectAsMap() portalMap.foreach(m => { val updateJsonArray = new JSONArray() val jsonObject = new JSONObject() jsonObject.put("id", m._1) if (pkgTypeCount(m._1) == 1) { jsonObject.put("s3_path", s"$output/${m._1}/${m._2}/") } else { jsonObject.put("s3_path", s"$output/${m._1}/*/") } updateJsonArray.add(jsonObject) ServerUtil.update(updateJsonArray) }) } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object RTDmpRequestDaily { def main(args: Array[String]): Unit = { new RTDmpRequestDaily().run(args) } }