package mobvista.dmp.datasource.TO import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, SparkSession} class TODaily extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output", true, "[must] output") options.addOption("dt_dash_today", true, "[must] dt_dash_today") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output = commandLine.getOptionValue("output") val dt_dash_today = commandLine.getOptionValue("dt_dash_today") val spark = SparkSession.builder() .appName("TODaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ val imeifilterPtn = "^([0]{15,17})$" FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) try { val sql1= s""" |select t1.imei device_id,'imei' device_type,'android' platform,t2.platform_app_id package_name,country_code country from |(select imei,app_id,country_code |from uparpu_main.uparpu_device_active where dt='${dt_dash_today}' and os_platform='1' and imei rlike '${imeiPtn}' and imei not rlike '${imeifilterPtn}' and imei!='null' ) t1 |join |(select uuid,platform_app_id from uparpu_main.uparpu_app where platform_app_id like 'com%') t2 |on t1.app_id=t2.uuid |UNION |select t1.oaid device_id,'oaid' device_type,'android' platform,t2.platform_app_id package_name,country_code country from |(select oaid,app_id,country_code |from uparpu_main.uparpu_device_active where dt='${dt_dash_today}' and os_platform='1' and oaid!='' and oaid!='00000000-0000-0000-0000-000000000000' and oaid!='null') t1 |join |(select uuid,platform_app_id from uparpu_main.uparpu_app where platform_app_id like 'com%') t2 |on t1.app_id=t2.uuid |UNION |select t1.gaid device_id,'gaid' device_type,'android' platform,t2.platform_app_id package_name,country_code country from | (select gaid,app_id,country_code | from uparpu_main.uparpu_device_active where dt='${dt_dash_today}' and os_platform='1' and gaid!='' and gaid!='00000000-0000-0000-0000-000000000000' and gaid!='null') t1 | join | (select uuid,platform_app_id from uparpu_main.uparpu_app where platform_app_id like 'com%') t2 | on t1.app_id=t2.uuid | UNION | select t1.idfa device_id,'idfa' device_type,'ios' platform,t2.app_store_id package_name,country_code country from | (select idfa,app_raw_id,country_code | from uparpu_main.uparpu_device_active where dt='${dt_dash_today}' and os_platform='2' and idfa!='' and idfa!='00000000-0000-0000-0000-000000000000' and idfa!='null') t1 | join | (select id,app_store_id from uparpu_main.uparpu_app where app_store_id!='') t2 | on t1.app_raw_id=t2.id """.stripMargin spark.sql(sql1).coalesce(coalesce.toInt) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) .orc(output) } finally { spark.stop() } 0 } } object TODaily { def main(args: Array[String]): Unit = { new TODaily().run(args) } }