AppsFlyerTotal.scala 4.33 KB
package mobvista.dmp.datasource.appsflyer

import java.net.URI
import java.util.Properties

import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}


class AppsFlyerTotal extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("outputtotal", true, "[must] outputtotal")
    options.addOption("dmpuserinfo",true ,"[must] dmpuserinfo")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("today", true, "[must] today")
    options.addOption("update_date", true, "[must] update_date")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val didPtn = "^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$"
    val outputtotal = commandLine.getOptionValue("outputtotal")
    val dmpuserinfo = commandLine.getOptionValue("dmpuserinfo")
    val coalesce = commandLine.getOptionValue("coalesce")
    val today = commandLine.getOptionValue("today")
    val update_date = commandLine.getOptionValue("update_date")



    val spark = SparkSession.builder()
      .appName("AppsFlyerTotal")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(dmpuserinfo), true)

    try {

      val sqlContext = spark.sqlContext
      val properties = new Properties()
      properties.put("user", "adnro")
      properties.put("password", "YcM123glh")
      val url = "jdbc:mysql://adn-data-foronlinetest.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/mob_adn"
      sqlContext.read.jdbc(url, "appsflyer_audience", properties).select("id", "platform", "dmp_package").toDF("id", "platform", "dmp_package").createOrReplaceTempView("appsflyer_audience")

      val sqlAfOrgDaily=
        s"""
           |select t1.devid device_id,max(case when t2.platform='android' then 'gaid'
           |  when  t2.platform='ios' then 'idfa' end )   device_type,
           | max(t2.platform) platform,
           | max(t2.dmp_package)  package_names,
           | '' category,
           | t1.advertiser_id  advertiser,
           | '${update_date}' update_date
           |from
           |( select /*+ mapjoin(t2)*/  devid, container_id,advertiser_id from dwh.etl_appsflyer_audience_org where dt ='${today}' and devid rlike '${didPtn}' )
           |t1 join appsflyer_audience t2 on (t1.container_id = t2.id)
           |group by t1.devid,t1.advertiser_id
      """.stripMargin

      spark.sql(sqlAfOrgDaily)
        .coalesce(coalesce.toInt)
        .write.
        mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputtotal)


      val sqlUserInfo=
        s"""
           |select t1.devid device_id,max(case when t2.platform='android' then 'gaid'
           | when  t2.platform='ios' then 'idfa'  end )   device_type,
           | max(t2.platform) platform,
           |'UNKNOWN' country,
           |'' age,
           |'' gender,
           |'' tags,
           |'${update_date}' first_req_day,
           |'${update_date}' last_req_day
           |from
           |( select /*+ mapjoin(t2)*/ devid, container_id from dwh.etl_appsflyer_audience_org where dt ='${today}' and devid rlike '${didPtn}' )
           |t1 join appsflyer_audience t2 on (t1.container_id = t2.id)
           |group by t1.devid
      """.stripMargin


      spark.sql(sqlUserInfo)
        .coalesce(coalesce.toInt)
        .write.
        mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(dmpuserinfo)


    } finally {
      spark.stop()
    }
    0
  }
}

object AppsFlyerTotal {
  def main(args: Array[String]): Unit = {
    new AppsFlyerTotal().run(args)
  }
}