package mobvista.dmp.datasource.appsflyer

import java.util.Properties

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}

class AppsFlyerTotal extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("outputtotal", true, "[must] outputtotal")
    options.addOption("dmpuserinfo",true ,"[must] dmpuserinfo")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("today", true, "[must] today")
    options.addOption("update_date", true, "[must] update_date")

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      return -1
    } else printOptions(commandLine)

    val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"
    val outputtotal = commandLine.getOptionValue("outputtotal")
    val dmpuserinfo = commandLine.getOptionValue("dmpuserinfo")
    val coalesce = commandLine.getOptionValue("coalesce")
    val today = commandLine.getOptionValue("today")
    val update_date = commandLine.getOptionValue("update_date")

    val spark = SparkSession.builder()
      .config("spark.rdd.compress", "true")
      .config("", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(dmpuserinfo), true)

    try {

      val sqlContext = spark.sqlContext
      val properties = new Properties()
      properties.put("user", "adnro")
      properties.put("password", "YcM123glh")
      val url = "jdbc:mysql://"
      val sqlAfOrgDaily=
           |select t1.devid device_id,max(case when t2.platform='android' then 'gaid'
           |  when  t2.platform='ios' then 'idfa' end )   device_type,
           | max(t2.platform) platform,
           | max(t2.dmp_package)  package_names,
           | '' category,
           | t1.advertiser_id  advertiser,
           | '${update_date}' update_date
           |( select /*+ mapjoin(t2)*/  devid, container_id,advertiser_id from dwh.etl_appsflyer_audience_org where dt ='${today}' and devid rlike '${didPtn}' )
           |t1 join appsflyer_audience t2 on (t1.container_id =
           |group by t1.devid,t1.advertiser_id

        .option("orc.compress", "zlib")

      val sqlUserInfo=
           |select t1.devid device_id,max(case when t2.platform='android' then 'gaid'
           | when  t2.platform='ios' then 'idfa'  end )   device_type,
           | max(t2.platform) platform,
           |'UNKNOWN' country,
           |'' age,
           |'' gender,
           |'' tags,
           |'${update_date}' first_req_day,
           |'${update_date}' last_req_day
           |( select /*+ mapjoin(t2)*/ devid, container_id from dwh.etl_appsflyer_audience_org where dt ='${today}' and devid rlike '${didPtn}' )
           |t1 join appsflyer_audience t2 on (t1.container_id =
           |group by t1.devid

        .option("orc.compress", "zlib")

    } finally {

object AppsFlyerTotal {
  def main(args: Array[String]): Unit = {
    new AppsFlyerTotal().run(args)