FaceBookTotal.scala 5.46 KB
package mobvista.dmp.datasource.facebook

import java.net.URI

import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.datasource.age_gender.{Constant, Logic}
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel


class FaceBookTotal extends CommonSparkJob {

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("outputtotal", true, "[must] outputtotal")
    options.addOption("outputgender",true, "[must] outputgender")
    options.addOption("coalesce", true, "[must] coalesce")
    options.addOption("today", true, "[must] today")
    options.addOption("yesterday", true, "[must] yesterday")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val outputtotal = commandLine.getOptionValue("outputtotal")
      val outputgender = commandLine.getOptionValue("outputgender")
    val coalesce = commandLine.getOptionValue("coalesce")
    val today = commandLine.getOptionValue("today")
    val yesterday = commandLine.getOptionValue("yesterday")


    val spark = SparkSession.builder()
      .appName("FaceBookTotal")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true)
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputgender), true)

    try {

      spark.udf.register("check_deviceId", Logic.check_deviceId _)
    //  spark.udf.register("check_gender", Logic.check_gender _)


      val sql1=
        s"""
          |SELECT COALESCE(tbl_daily.device_id,tbl_total.device_id) device_id,
          |       COALESCE(tbl_daily.device_type,tbl_total.device_type) device_type,
          |       COALESCE(tbl_daily.platform,tbl_total.platform) platform,
                  CASE WHEN  tbl_daily.package_name is null and tbl_total.package_name is not null  then tbl_total.package_name
          |       when tbl_total.package_name is null and tbl_daily.package_name is not null  then tbl_daily.package_name
          |       when tbl_total.package_name is not null and tbl_daily.package_name is not null  then  concat(tbl_daily.package_name,'#',tbl_total.package_name) end as package_name,
          |       COALESCE(tbl_daily.country,tbl_total.country) country,
          |       COALESCE(tbl_daily.gender,tbl_total.gender) gender
          |from ( SELECT device_id,device_type,platform,max(package_name) package_name,max(country) country,max(case when gender = 'male' then 'm'  when gender = 'female' then 'f' else gender end)  gender
          |from dwh.etl_facebook_daily where dt ='${today}'
          |group by  device_id,device_type,platform) tbl_daily
          |FULL JOIN
          |( SELECT device_id,device_type,platform,max(package_names) package_name,max(country) country,max(case when gender = 'male' then 'm'  when gender = 'female' then 'f' else gender  end)  gender
          |from dwh.etl_facebook_total where dt ='${yesterday}'
          | group by device_id,device_type,platform ) tbl_total
          |ON (tbl_daily.device_id = tbl_total.device_id and tbl_daily.device_type = tbl_daily.device_type and tbl_daily.platform = tbl_total.platform)
        """.stripMargin
      spark.sql(sql1).createOrReplaceTempView("etl_facebook_pre_deal")


         val sql2=
        s"""
           |select device_id,device_type,platform,concat_ws('#',collect_set(package_name)) package_names,country,gender from
           |(select device_id,device_type,platform,explode(split(package_name,'#')) package_name,country,gender
           |from etl_facebook_pre_deal ) X
           |group by  device_id,device_type,platform,country,gender
        """.stripMargin
      spark.sql(sql2).createOrReplaceTempView("etl_facebook_total")

      val sql3 =
        s"""
           |select device_id,device_type,min(platform) platform,min(package_names) package_names,min(country) country,min(gender) gender from
           |etl_facebook_total
           |group by device_id,device_type
        """.stripMargin

     val etl_fb_total = spark.sql(sql3).persist(StorageLevel.MEMORY_AND_DISK)

      etl_fb_total.coalesce(coalesce.toInt)
        .write
        .mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputtotal)

     val gender_rdd =  etl_fb_total.rdd.map(line => {
        val device_id = line.getAs[String]("device_id")
        val device_type = line.getAs[String]("device_type")
        val gender = line.getAs[String]("gender")
        Row(device_id, "A", gender, "fb", device_type)
      })

      spark.createDataFrame(gender_rdd, Constant.schema_age_gender)
        .write.mode(SaveMode.Overwrite)
        .option("orc.compress", "zlib")
        .orc(outputgender)


    } finally {
      spark.stop()
    }
    0
  }
}

object FaceBookTotal {
  def main(args: Array[String]): Unit = {
    new FaceBookTotal().run(args)
  }
}