package mobvista.dmp.datasource.reyun import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.DateUtil import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SparkSession import java.net.URI /** * @author jiangfan * @date 2021/7/23 18:00 */ class ReyunLabelBaijiu extends CommonSparkJob with Serializable{ override protected def buildOptions(): Options = { val options = new Options options.addOption("coalesce", true, "[must] coalesce") options.addOption("output", true, "[must] output") options.addOption("dt_today", true, "[must] dt_today") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val coalesce = commandLine.getOptionValue("coalesce") val output = commandLine.getOptionValue("output") val dt_today = commandLine.getOptionValue("dt_today") val spark = SparkSession.builder() .appName("ReyunLabelBaijiu") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) val one_day_ago = DateUtil.getDayByString(dt_today, "yyyyMMdd", -1) val two_days_ago = DateUtil.getDayByString(dt_today, "yyyyMMdd", -2) try { val sql1= s""" |select distinct device_id |from |(select md5(imei) as device_id |from |(select app_id, |(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then imei else null end) as imei, |(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid |from |dwh.ods_adn_trackingnew_request |where concat(yyyy,mm,dd) in ('${dt_today}') |and country_code = 'CN' |and platform = 'android' |union all |select app_id,(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then imei else null end) as imei, |(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid |from dwh.ods_adn_trackingnew_hb_request |where concat(yyyy,mm,dd) in ('${dt_today}') |and country_code = 'CN' |and platform = 'android' |) as tmp |where tmp.imei is not null and tmp.app_id in (select appid from dwh.reyun_label_baijiu_filter_appid) |union all |select md5(oaid) as device_id |from |(select app_id, |(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then imei else null end) as imei, |(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid |from |dwh.ods_adn_trackingnew_request |where concat(yyyy,mm,dd) in ('${dt_today}') |and country_code = 'CN' |and platform = 'android' |union all |select app_id,(case when imei not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then imei else null end) as imei, |(case when ext_oaid not in ('00000000-0000-0000-0000-000000000000','0000000000000000','0','',' ') then ext_oaid else null end ) as oaid |from dwh.ods_adn_trackingnew_hb_request |where concat(yyyy,mm,dd) in ('${dt_today}') |and country_code = 'CN' |and platform = 'android' |) as tmp |where tmp.oaid is not null and tmp.app_id in (select appid from dwh.reyun_label_baijiu_filter_appid) |union all |select if(imeimd5 is null ,md5(imei),imeimd5) as device_id |from |(select split(ext5,',')[4] as imei,split(ext5,',')[5] as imeimd5 |from adn_dsp.log_adn_dsp_request_orc_hour |where concat(yr,mt,dt) in ('${dt_today}') and hh >='00' and hh<'06' |and countrycode = 'CN' |and os = 'android' |and exchanges='xunfei' |) adx |union all |select if(oaidmd5 is null ,md5(oaid),oaidmd5) as device_id |from |(select split(ext5,',')[12] as oaid ,split(ext5,',')[13] as oaidmd5 |from adn_dsp.log_adn_dsp_request_orc_hour |where concat(yr,mt,dt) in ('${dt_today}') and hh >='00' and hh<'06' |and countrycode = 'CN' |and os = 'android' |and exchanges='xunfei' |) adx |) tmp |""".stripMargin println("sql=============="+sql1) spark.sql(sql1).rdd.map(_.mkString).coalesce(coalesce.toInt).saveAsTextFile(output) } finally { spark.stop() } 0 } } object ReyunLabelBaijiu { def main(args: Array[String]): Unit = { new ReyunLabelBaijiu().run(args) } }