package mobvista.dmp.datasource.taobao

import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.storage.StorageLevel
import java.net.URI
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("dt_today", true, "[must] dt_today")
    options.addOption("dt_oneday_ago", true, "[must] dt_oneday_ago")
    options.addOption("update", true, "[must] update")
    options.addOption("output", true, "[must] output")

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      return -1
    } else printOptions(commandLine)

    val dt_today = commandLine.getOptionValue("dt_today")
    val dt_oneday_ago = commandLine.getOptionValue("dt_oneday_ago")
    val update = commandLine.getOptionValue("update")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession("UCOtherDataToDmp")

    val sc = spark.sparkContext

    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

    try {
      val conf = spark.sparkContext.hadoopConfiguration
      conf.set("mapreduce.output.compress", "true")
      conf.set("mapreduce.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
      conf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.setClass("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec], classOf[CompressionCodec])

      val sql =
           |SELECT device_id, COLLECT_SET(package_name) install_list
           | FROM
           |  (
           |  SELECT device_id, package_name
           |    FROM dwh.dm_install_list_v2
           |    WHERE dt = '${dt_today}' AND business = 'uc_activation' AND device_type = 'imeimd5'
           |    AND package_name IN ('com.uc.foractivation.4b5a58','com.uc.foractivation.d3f521')
           |  UNION
           |  SELECT device_id, package_name
           |    FROM dwh.dm_install_list_v2
           |    WHERE dt = '${dt_oneday_ago}' AND business = 'dsp_req' AND device_type = 'imeimd5'
           |    AND package_name IN ('com.UCMobile_bes','com.ucmobile_oppo')
           |  ) t
           | GROUP BY device_id

      val df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER)

      val rdd = df.rdd.map(r => {
        val arrayBuffer = new ArrayBuffer[(Text, Text)]()
        val deviceId = r.getAs[String]("device_id")
        val deviceType = "imeimd5"
        val platform = "android"
        val installList = r.getAs[mutable.WrappedArray[String]]("install_list")
        if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.UCMobile_bes")) {
          arrayBuffer += ((new Text(s"$output/4b5a58_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucbes", update))))
        if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.UCMobile_bes")) {
          arrayBuffer += ((new Text(s"$output/d3f521_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucbes", update))))
        if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.ucmobile_oppo")) {
          arrayBuffer += ((new Text(s"$output/4b5a58_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucoppo", update))))
        if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.ucmobile_oppo")) {
          arrayBuffer += ((new Text(s"$output/d3f521_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucoppo", update))))
      }).flatMap(l => {
        .saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)

    } finally {

object UCOtherDataToDmpV2 {
  def main(args: Array[String]): Unit = {
    new UCOtherDataToDmpV2().run(args)