package mobvista.dmp.datasource.device import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.functions import java.net.URI import java.text.SimpleDateFormat class OdsDmpUserInfoDailyMapping extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val input = commandLine.getOptionValue("input") val coalesce = commandLine.getOptionValue("coalesce") val output = commandLine.getOptionValue("output") val sdf1 = new SimpleDateFormat("yyyy-MM-dd"); val sdf2 = new SimpleDateFormat("yyyyMMdd"); val spark = MobvistaConstant.createSparkSession(s"OdsDmpUserInfoDailyMapping.${date}") try { val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) val mappingDF = spark.read.orc(input) val sql = s""" |SELECT * FROM dwh.ods_dmp_user_info_daily | WHERE dt = '${date}' AND UPPER(country) = 'CN' |""".stripMargin val userInfoDailyDF = spark.sql(sql) mappingDF.join(userInfoDailyDF, mappingDF("lower_device_id") === userInfoDailyDF("dev_id"), "right") .select( functions.coalesce(mappingDF("device_id"), userInfoDailyDF("dev_id")).alias("dev_id"), functions.md5(functions.coalesce(mappingDF("device_id"), userInfoDailyDF("dev_id"))).alias("dev_id_md5"), userInfoDailyDF("dev_type"), userInfoDailyDF("platform"), userInfoDailyDF("install"), userInfoDailyDF("interest"), userInfoDailyDF("model"), userInfoDailyDF("country"), userInfoDailyDF("osversion"), userInfoDailyDF("age"), userInfoDailyDF("gender"), userInfoDailyDF("behavior") ).repartition(coalesce.toInt) .write .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("input", true, "input") options.addOption("coalesce", true, "coalesce") options.addOption("output", true, "output") options } } object OdsDmpUserInfoDailyMapping { def main(args: Array[String]): Unit = { new OdsDmpUserInfoDailyMapping().run(args) } }