package mobvista.dmp.datasource.dm import java.net.URI import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.util.{DateUtil, MD5Util} import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SaveMode import scala.collection.mutable /** * @package: mobvista.dmp.datasource.dm * @author: wangjf * @date: 2020-12-14 17:50:36 * @time: 下午6:42 * @email: jinfeng.wang@mobvista.com */ class DmpDeviceIdMd5 extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) printOptions(commandLine) return 1 } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val spark = MobvistaConstant.createSparkSession(s"DmpDeviceIdMd5.${date}") val sc = spark.sparkContext try { FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) spark.udf.register("getMd5", getMd5 _) spark.udf.register("getDeviceId", getDeviceId _) spark.udf.register("getDeviceType", getDeviceType _) val last_req_day = DateUtil.getDayByString(MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(date)), "yyyy-MM-dd", -30) val sql = s""" |SELECT getMd5(device_id) device_id_md5, | getDeviceId(COLLECT_SET(device_id)) device_id, | getDeviceType(COLLECT_SET(device_type)) device_type | FROM dwh.ods_dmp_user_info WHERE dt = '$date' AND last_req_day >= '$last_req_day' |GROUP BY getMd5(device_id) |""".stripMargin spark.sql(sql) .coalesce(coalesce) .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { sc.stop() spark.stop() } } 0 } def getMd5(device_id: String): String = { if (device_id.matches(MobvistaConstant.md5Ptn)) { device_id } else { MD5Util.getMD5Str(device_id) } } def getDeviceId(deviceIds: mutable.WrappedArray[String]): String = { var deviceId = "" var flag = true val its = deviceIds.iterator while (its.hasNext && flag) { deviceId = its.next() if (!deviceId.matches(MobvistaConstant.md5Ptn)) { flag = false } } deviceId } def getDeviceType(deviceTypes: mutable.WrappedArray[String]): String = { var deviceType = "" var flag = true val its = deviceTypes.iterator while (its.hasNext && flag) { deviceType = its.next() if (!deviceType.endsWith("md5")) { flag = false } } deviceType.replaceAll("_", "") } override protected def buildOptions(): Options = { val options = new Options options.addOption("date", true, "[must] date") options.addOption("output", true, "[must] output") options.addOption("coalesce", true, "[must] coalesce") options } } object DmpDeviceIdMd5 { def main(args: Array[String]): Unit = { new DmpDeviceIdMd5().run(args) } }