package mobvista.dmp.datasource.facebook import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.storage.StorageLevel /** * @author: kehan * @date: 2019/06/26 */ class FaceBookDaily extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("input", true, "[must] input") options.addOption("output", true, "[must] output") options.addOption("coalesce", true, "[must] coalesce") options.addOption("today", true, "[must] today") options.addOption("yesterday", true, "[must] yesterday") options.addOption("last_sunday", true, "[must] last_sunday") options.addOption("unmatched", true, "[must] yesterday") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val today = commandLine.getOptionValue("today") val yesterday = commandLine.getOptionValue("yesterday") val last_sunday = commandLine.getOptionValue("last_sunday") val unmatched = commandLine.getOptionValue("unmatched") val spark = SparkSession.builder() .appName("FaceBookDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(unmatched), true) try { val sql1= s""" |select t.device_id_md5,t.device_type,t.platform,t.package_name,t.country,t.gender from |(select device_id as device_id_md5, |case when os='android' then 'gaid' when os='ios' then 'idfa' else '' end as device_type, |os as platform, |package_name, |user_country as country, |case when genders[0]=1 and genders[1] is null then 'male' |when genders[0]=2 and genders[1] is null then 'female' |else null end as gender |from dwh.etl_fb_org_daily where dt='${today}' |union all |select device_id_md5,device_type,platform,package_name,country,gender |from dwh.etl_fb_unmatched_history where dt='${yesterday}') t """.stripMargin spark.sql(sql1) .persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("etl_fb_all_daily") spark.sql("select device_id_md5 from etl_fb_all_daily group by device_id_md5") .createOrReplaceTempView("fb_device_md5") val sql2= s""" |select /*+ MAPJOIN(a) */ b.device_id,a.device_id_md5 |from fb_device_md5 a join |(select * from dwh.device_id_md5_match where dt='${last_sunday}') b |on a.device_id_md5=b.device_id_md5 """.stripMargin spark.sql(sql2).createOrReplaceTempView("etl_fb_md5") val sql3= """ |select b.device_id,a.device_id_md5,a.device_type,a.platform,a.package_name,a.country,a.gender |from etl_fb_all_daily a left join etl_fb_md5 b on a.device_id_md5=b.device_id_md5 """.stripMargin spark.sql(sql3) .persist(StorageLevel.MEMORY_AND_DISK_SER).createOrReplaceTempView("etl_fb_md5_daily") val sql4= """ |select device_id, |max(device_id_md5) device_id_md5, |max(device_type) device_type, |max(platform) platform, |concat_ws('#',collect_set(package_name)) package_name, |max(country) country, |max(gender) gender |from etl_fb_md5_daily where device_id is not null group by device_id """.stripMargin spark.sql(sql4).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .orc(output) spark.sql("select device_id_md5,device_type,platform,package_name,country,gender from etl_fb_md5_daily where device_id is null") .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(unmatched) } finally { spark.stop() } 0 } } object FaceBookDaily { def main(args: Array[String]): Unit = { new FaceBookDaily().run(args) } }