package mobvista.dmp.datasource.iqiyi

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.{Logic, ServerUtil}
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.{DateUtil, PropertyUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.storage.StorageLevel

import java.net.URI
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.iqiyi
 * @author: wangjf
 * @date: 2020/4/29
 * @time: 11:52 上午
 * @email: jinfeng.wang@mobvista.com
 * @phone: 152-1062-7698
 */
class RTDmpIQiYiRequest extends CommonSparkJob with Serializable {
  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("date", true, "date")
    options.addOption("output", true, "output")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val date = commandLine.getOptionValue("date")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession("RTDmpIQiYiRequest")
    try {
      val sc = spark.sparkContext

      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

      val update_date = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(date))
      val start = DateUtil.getDayByString(update_date, "yyyy-MM-dd", -180)
      val end = update_date

      val sql =
        s"""
           |SELECT device_id, MAX(update_date) update_date
           | FROM dwh.dm_install_list_v2 WHERE dt = '$date'
           | AND update_date BETWEEN '$start' AND '$end'
           | AND package_name IN ('com.qiyi.video', 'com.qiyi.android')
           | AND device_type IN ('imei','imeimd5')
           | GROUP BY device_id
           |""".stripMargin

      val endTime = MobvistaConstant.sdf1.parse(end).getTime

      val rdd = spark.sql(sql)
        .rdd.map(row => {
        val deviceId = row.getAs[String]("device_id")
        val updateDate = row.getAs[String]("update_date").replace("\"", "")
        val startTime = if (StringUtils.isNotBlank(updateDate)) {
          MobvistaConstant.sdf1.parse(updateDate).getTime
        } else {
          endTime
        }
        val days = (endTime - startTime) / (1000 * 3600 * 24) / 30
        val packageName = days match {
          case 0 => "com.qiyi_30_imei"
          case 1 => "com.qiyi_60_imei"
          case 2 => "com.qiyi_90_imei"
          case 3 => "com.qiyi_120_imei"
          case 4 => "com.qiyi_150_imei"
          case _ => "com.qiyi_180_imei"
        }
        (deviceId, packageName)
      })

      rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

      rdd.map(r => {
        (new Text(s"$output/${r._2}"), new Text(r._1))
      }).coalesce(100)
        .saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]])

      val set = rdd.map(t => {
        t._2
      }).distinct(200)
        .collect()
        .toSet[String]

      val uploadPkgSet = new mutable.HashSet[String]()
      val uploadJsonArray = new JSONArray()
      val updateJsonArray = new JSONArray()

      //  停掉 audience
      val stopAudience = PropertyUtil.getProperty("config.properties",
        s"rtdmp.stop.audience").split(",", -1).toSet[String]
        .map(r => {
          Integer.valueOf(r)
        })

      set.foreach(packageName => {
        val jsonObject = new JSONObject()
        val map = Logic.getAudienceInfo("iqiyi")
        if (map.contains(packageName)) {
          if (!stopAudience.contains(map(packageName))) {
            jsonObject.put("id", map(packageName))
            jsonObject.put("s3_path", s"$output/$packageName")
            jsonObject.put("status", 1)
            jsonObject.put("audience_data_status", 1)
            updateJsonArray.add(jsonObject)
          }
        } else {
          jsonObject.put("s3_path", s"$output/$packageName")
          jsonObject.put("platform", 1)
          jsonObject.put("match_device_type", 1)
          jsonObject.put("audience_type", 2)
          jsonObject.put("data_update_method", 1)
          jsonObject.put("audience_name", packageName)
          jsonObject.put("status", 1)
          jsonObject.put("audience_gender", 3)
          jsonObject.put("audience_count", 1)
          jsonObject.put("is_sync_dmpserver", 1)
          jsonObject.put("audience_data_status", 1)
          uploadPkgSet.add(packageName)
          uploadJsonArray.add(jsonObject)
        }
      })

      val uploadJsonObject = ServerUtil.upload(uploadJsonArray)

      if (uploadJsonObject.getInteger("code") == 200) {
        println("RTDmp Upload OK,AudienceId -->> " + uploadJsonObject.getJSONArray("data"))
      }

      Logic.writeAudienceInfo("iqiyi", Logic.getAudienceMap(uploadPkgSet))

      //  每次更新一个
      for (i <- 0 until updateJsonArray.size()) {
        val updateObject = new JSONArray()
        updateObject.add(updateJsonArray.getJSONObject(i))
        var flag = true
        while (flag) {
          val updateJsonObject = ServerUtil.update(updateObject)
          if (updateJsonObject.getInteger("code") == 200) {
            println("RTDmp Update OK,updateJson -->> " + updateObject)
            flag = false
          }
        }
      }
    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object RTDmpIQiYiRequest {
  def main(args: Array[String]): Unit = {
    new RTDmpIQiYiRequest().run(args)
  }
}