package mobvista.dmp.datasource.adn import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.util.MRUtils import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import java.net.URI class AdnOrgLogEtlHours extends CommonSparkJob with Serializable { val TAB_DELIMITER = "\t" def commandOptions(): Options = { val options = new Options() options.addOption("datetime", true, "datetime") options.addOption("region", true, "region") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val coalesce = commandLine.getOptionValue("coalesce") val datetime = commandLine.getOptionValue("datetime") val region = commandLine.getOptionValue("region") val output = commandLine.getOptionValue("output") val spark = MobvistaConstant.createSparkSession(s"AdnOrgLogEtlHours.$datetime.$region") // 添加hb request的数据入库dmp,数据源在s3://mob-ad/adn/hb-v1/request 本身该路径已经挂载表dwh.ods_adn_trackingnew_hb_request 但是该表的挂载语句和和下面用到的表dwh.ods_adn_trackingnew_request挂载语句不是同时执行的 // 执行这个文件的shell脚本,运行运行时是通过判断路径下的_SUCCESS文件来进行运行的,可能出现表dwh.ods_adn_trackingnew_request挂载成功,但是表dwh.ods_adn_trackingnew_hb_request 挂载还没成功的情况,此时表dwh.ods_adn_trackingnew_hb_request 访问不到 // 该小时下的数据,所以改用 dwh.ods_adn_trackingnew_request_tmp_hb_request ,因为他和表dwh.ods_adn_trackingnew_request挂载是同时进行的,执行脚本 https://gitlab.mobvista.com/fan.jiang/ods_adn_trackingnew_click_merge/blob/master/job/ods_adn_trackingnew_request_merge.sh // azkaban链接 https://dataplatform.mobvista.com:8443/manager?project=ods_adn_trackingnew_click_merge&flow=ods_adn_trackingnew_merge#executions val sql = s""" |SELECT date, time, created timestamp, app_id, platform, os_version, sdk_version, device_model, screen_size, country_code, | language, strategy, ip, imei, mac, dev_id android_id, gaid, idfa, device_brand, getDevId(cdn_ab) idfv, ext_packagename package_name, | getDevId(ext_sysid) sysid, ext_oaid oaid, getRuid(ext_algo) ruid | FROM dwh.ods_adn_trackingnew_request WHERE CONCAT(yyyy,mm,dd,hh) = '$datetime' AND re = '$region' | UNION | SELECT date, time, created timestamp, app_id, platform, os_version, sdk_version, device_model, screen_size, country_code, | language, strategy, ip, imei, mac, dev_id android_id, gaid, idfa, device_brand, getDevId(cdn_ab) idfv, ext_packagename package_name, | getDevId(ext_sysid) sysid, ext_oaid oaid, getRuid(ext_algo) ruid | FROM dwh.ods_adn_trackingnew_request_tmp_hb_request WHERE CONCAT(yyyy,mm,dd,hh) = '$datetime' AND re = '${region}_hb_request' |""".stripMargin try { spark.udf.register("getDevId", AdnConstant.getDevId _) spark.udf.register("getRuid", AdnConstant.getRuid _) val rdd = spark.sql(sql).dropDuplicates().rdd.map(r => { val date = r.getAs[String]("date") val time = r.getAs[String]("time") val timestamp = r.getAs[String]("timestamp") val appId = r.getAs[String]("app_id") val platform = r.getAs[String]("platform") val osVersion = r.getAs[String]("os_version") val sdkVersion = r.getAs[String]("sdk_version") val deviceModel = r.getAs[String]("device_model") val screenSize = r.getAs[String]("screen_size") val countryCode = r.getAs[String]("country_code") val language = r.getAs[String]("language") val strategy = r.getAs[String]("strategy") val ip = r.getAs[String]("ip") val imei = r.getAs[String]("imei") val mac = r.getAs[String]("mac") val androidId = r.getAs[String]("android_id") val gaid = r.getAs[String]("gaid") val idfa = r.getAs[String]("idfa") val deviceBrand = r.getAs[String]("device_brand") val idfv = r.getAs[String]("idfv") val packageName = r.getAs[String]("package_name") val sysId = r.getAs[String]("sysid") val oaid = r.getAs[String]("oaid") val ruid = r.getAs[String]("ruid") if ((StringUtils.isNotBlank(idfa) && idfa.matches(MobvistaConstant.didPtn)) || (StringUtils.isNotBlank(idfv) && idfv.matches(MobvistaConstant.didPtn)) || (StringUtils.isNotBlank(oaid) && oaid.matches(MobvistaConstant.oaidPtb)) || (StringUtils.isNotBlank(sysId) && sysId.matches(MobvistaConstant.didPtn)) || (StringUtils.isNotBlank(ruid) && ruid.length > 16)) { val plt = if (StringUtils.isNotBlank(platform)) { platform } else { "ios" } MRUtils.JOINER.join(date, time, timestamp, appId, plt, osVersion, sdkVersion, deviceModel, screenSize, countryCode, language, ip, "", mac, "", "", idfa, deviceBrand, sysId, packageName, strategy, oaid, idfv, ruid) } else if ((StringUtils.isNotBlank(gaid) && gaid.matches(MobvistaConstant.didPtn)) || (StringUtils.isNotBlank(imei) && imei.matches(MobvistaConstant.imeiPtn)) || (StringUtils.isNotBlank(androidId) && androidId.matches(MobvistaConstant.andriodIdPtn)) || (StringUtils.isNotBlank(oaid) && oaid.matches(MobvistaConstant.oaidPtb)) || (StringUtils.isNotBlank(sysId) && sysId.matches(MobvistaConstant.didPtn))) { val plt = if (StringUtils.isNotBlank(platform)) { platform } else { "android" } MRUtils.JOINER.join(date, time, timestamp, appId, plt, osVersion, sdkVersion, deviceModel, screenSize, countryCode, language, ip, imei, mac, androidId, gaid, "", deviceBrand, sysId, packageName, strategy, oaid, "", "") } else { null } }).filter(l => { StringUtils.isNotBlank(l) }) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) rdd.repartition(coalesce.toInt) .saveAsTextFile(output, classOf[GzipCodec]) } finally { if (spark != null) { spark.stop() } } 0 } override protected def buildOptions(): Options = { val options = new Options options.addOption("input", true, "[must] input") options.addOption("output", true, "[must] output") options.addOption("coalesce", true, "[must] coalesce") options } } object AdnOrgLogEtlHours { def main(args: Array[String]): Unit = { new AdnOrgLogEtlHours().run(args) } }