RTDmpIQiYiRequest.scala 5.63 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
package mobvista.dmp.datasource.iqiyi

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.{Logic, ServerUtil}
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.storage.StorageLevel

import java.net.URI
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.iqiyi
 * @author: wangjf
 * @date: 2020/4/29
 * @time: 11:52 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpIQiYiRequest extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession("RTDmpIQiYiRequest")
    try {
      val sc = spark.sparkContext

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      val update_date = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(date))
      val start = DateUtil.getDayByString(update_date, "yyyy-MM-dd", -180)
      val end = update_date

      val sql =
        s"""
           |SELECT device_id, MAX(update_date) update_date
           | FROM dwh.dm_install_list_v2 WHERE dt = '$date'
           | AND update_date BETWEEN '$start' AND '$end'
           | AND package_name IN ('com.qiyi.video', 'com.qiyi.android')
           | AND device_type IN ('imei','imeimd5')
           | GROUP BY device_id
           |""".stripMargin

      val endTime = MobvistaConstant.sdf1.parse(end).getTime

      val rdd = spark.sql(sql)
        .rdd.map(row => {
        val deviceId = row.getAs[String]("device_id")
        val updateDate = row.getAs[String]("update_date").replace("\"", "")
        val startTime = if (StringUtils.isNotBlank(updateDate)) {
          MobvistaConstant.sdf1.parse(updateDate).getTime
        } else {
          endTime
        }
        val days = (endTime - startTime) / (1000 * 3600 * 24) / 30
        val packageName = days match {
          case 0 => "com.qiyi_30_imei"
          case 1 => "com.qiyi_60_imei"
          case 2 => "com.qiyi_90_imei"
          case 3 => "com.qiyi_120_imei"
          case 4 => "com.qiyi_150_imei"
          case _ => "com.qiyi_180_imei"
        }
        (deviceId, packageName)
      })

      rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

      rdd.map(r => {
        (new Text(s"$output/${r._2}"), new Text(r._1))
      }).coalesce(100)
        .saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]])

      val set = rdd.map(t => {
        t._2
      }).distinct(200)
        .collect()
        .toSet[String]

      val uploadPkgSet = new mutable.HashSet[String]()
      val uploadJsonArray = new JSONArray()
      val updateJsonArray = new JSONArray()

      //  停掉 audience
      val stopAudience = PropertyUtil.getProperty("config.properties",
        s"rtdmp.stop.audience").split(",", -1).toSet[String]
        .map(r => {
          Integer.valueOf(r)
        })

      set.foreach(packageName => {
        val jsonObject = new JSONObject()
        val map = Logic.getAudienceInfo("iqiyi")
        if (map.contains(packageName)) {
          if (!stopAudience.contains(map(packageName))) {
            jsonObject.put("id", map(packageName))
            jsonObject.put("s3_path", s"$output/$packageName")
            jsonObject.put("status", 1)
            jsonObject.put("audience_data_status", 1)
            updateJsonArray.add(jsonObject)
          }
        } else {
          jsonObject.put("s3_path", s"$output/$packageName")
          jsonObject.put("platform", 1)
          jsonObject.put("match_device_type", 1)
          jsonObject.put("audience_type", 2)
          jsonObject.put("data_update_method", 1)
          jsonObject.put("audience_name", packageName)
          jsonObject.put("status", 1)
          jsonObject.put("audience_gender", 3)
          jsonObject.put("audience_count", 1)
          jsonObject.put("is_sync_dmpserver", 1)
          jsonObject.put("audience_data_status", 1)
          uploadPkgSet.add(packageName)
          uploadJsonArray.add(jsonObject)
        }
      })

      val uploadJsonObject = ServerUtil.upload(uploadJsonArray)

      if (uploadJsonObject.getInteger("code") == 200) {
        println("RTDmp Upload OK,AudienceId -->> " + uploadJsonObject.getJSONArray("data"))
      }

      Logic.writeAudienceInfo("iqiyi", Logic.getAudienceMap(uploadPkgSet))

      //  每次更新一个
      for (i <- 0 until updateJsonArray.size()) {
        val updateObject = new JSONArray()
        updateObject.add(updateJsonArray.getJSONObject(i))
        var flag = true
        while (flag) {
          val updateJsonObject = ServerUtil.update(updateObject)
          if (updateJsonObject.getInteger("code") == 200) {
            println("RTDmp Update OK,updateJson -->> " + updateObject)
            flag = false
          }
        }
      }
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpIQiYiRequest {
  def main(args: Array[String]): Unit = {
    new RTDmpIQiYiRequest().run(args)
  }
}