package mobvista.dmp.datasource.reyun

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

import java.net.URI

/**
 * @author jiangfan
 * @date 2021/7/1 14:59
 */

class ReyunLabelTest  extends CommonSparkJob with Serializable{
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("output", true, "[must] output")
    options.addOption("dt_today", true, "[must] dt_today")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val output = commandLine.getOptionValue("output")
    val dt_today = commandLine.getOptionValue("dt_today")

    val spark = SparkSession.builder()
      .appName("ReyunLabelTest")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    val one_day_ago = DateUtil.getDayByString(dt_today, "yyyyMMdd", -1)
    val two_days_ago = DateUtil.getDayByString(dt_today, "yyyyMMdd", -2)

    try {
      val sql1=
        s"""
           |select  md5(upper(dev_id))  from
           |(select  dev_id  from dwh.ods_dmp_user_info_daily
           |where dt in ('${dt_today}','${one_day_ago}','${two_days_ago}')
           |and platform='ios'  and dev_type='idfa'  and lower(country)='cn'
           |and  (osversion like '14%' or osversion like '15%' or osversion like '16%')
           |group by dev_id ) t
           |limit 120000000
        """.stripMargin

      println("sql=============="+sql1)

      spark.sql(sql1).rdd.map(_.mkString).coalesce(coalesce.toInt).saveAsTextFile(output)

    } finally {
      spark.stop()
    }
    0
  }
}




object ReyunLabelTest {
  def main(args: Array[String]): Unit = {
     new ReyunLabelTest().run(args)
  }
}