package mobvista.dmp.datasource.device import java.net.URI import java.text.SimpleDateFormat import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.DateUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SparkSession class OdsDmpUserInfoAllV2 extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val scheduleTime = commandLine.getOptionValue("cur_day") val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") val spark = SparkSession .builder() .appName("OdsDmpUserInfoAllV2") .config("spark.rdd.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) try { val sc = spark.sparkContext // 昨天时间 val yesBef1Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -1) val yesBef1Str = sdf2.format(sdf1.parse(yesBef1Part)) // 2daysago时间 val yesBef2Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -2) val yesBef2DayStr = sdf2.format(sdf1.parse(yesBef2Part)) // 14 days age val yesBef14Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -14) /* def schema: StructType = { StructType(StructField("device_id", StringType) :: Nil) } val publishDF = spark.createDataFrame(sc.textFile(input).map(r => { Row(r.toLowerCase()) }), schema) publishDF.dropDuplicates.createOrReplaceTempView("dev_publish") */ val hql = s""" |select coalesce(b.dev_id,a.dev_id) as dev_id, | coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5, | coalesce(b.dev_type,a.dev_type) as dev_type, | coalesce(b.platform,a.platform) as platform, | coalesce(b.install,a.install) as install, | coalesce(b.interest,a.interest) as interest, | coalesce(b.model,a.model) as model, | coalesce(b.country,a.country) as country, | coalesce(b.osversion,a.osversion) as osversion, | coalesce(b.age,a.age,'10') as age, | coalesce(b.gender,a.gender,10) as gender, | coalesce(b.behavior,a.behavior) as behavior, | coalesce(b.update_date,a.update_date) as update_date, | case when b.publish_date is not null and b.publish_date != '' then b.publish_date | when a.publish_date is not null and a.publish_date != '' then a.publish_date | else '${yesBef14Part}' end as publish_date |from | (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion, age, gender,behavior,update_date,publish_date | from dwh.ods_dmp_user_info_all_v2 where dt = '${yesBef2DayStr}' and update_date > '${yesBef14Part}') a | full outer join | ( | select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion, age, gender,behavior,update_date, | '${yesBef1Part}' as publish_date | from dwh.ods_dmp_user_info_daily where dt='${yesBef1Str}' | ) b | on a.dev_id = b.dev_id """.stripMargin /* hql = s""" |select lower(dev_id) dev_id, |max(dev_id_md5) as dev_id_md5, |max(dev_type) dev_type, |max(platform) platform, |max(install) as install, |max(interest) as interest, |max(model) as model, |max(country) as country, |max(osversion) as osversion, |min(age) as age, |min(gender) as gender, |max(behavior) as behavior, |max(update_date) as update_date, |max(publish_date) as publish_date |from dwh.ods_dmp_user_info_all where dt = '${yesBef1Str}' and update_date > '${yesBef14Part}' |group by lower(dev_id) """.stripMargin */ /* val hql = s""" |select coalesce(b.dev_id,a.dev_id) as dev_id, |coalesce(b.dev_id_md5,a.dev_id_md5) as dev_id_md5, |coalesce(b.dev_type,a.dev_type) as dev_type, |coalesce(b.platform,a.platform) as platform, |coalesce(b.install,a.install) as install, |coalesce(b.interest,a.interest) as interest, |coalesce(b.model,a.model) as model, |coalesce(b.country,a.country) as country, |coalesce(b.osversion,a.osversion) as osversion, |coalesce(b.age,a.age) as age, |coalesce(b.gender,a.gender) as gender, |coalesce(b.behavior,a.behavior) as behavior, |coalesce(b.update_date,a.update_date) as update_date, |coalesce(${yesBef1Part},a.publish_date) as publish_date |from | (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date | from dwh.ods_dmp_user_info_all where dt = '${yesBef2DayStr}') a | full outer join | (select dev_id,dev_id_md5,dev_type,platform,install,interest,model,case when upper(country) = 'GB' then 'UK' else upper(country) end as country ,osversion,age,gender,behavior,update_date | from dwh.ods_dmp_user_info_daily where dt='${yesBef1Str}') b | on a.dev_id = b.dev_id | left semi join dev_publish c | on a.dev_id = c.device_id """.stripMargin */ spark.sql(hql) .dropDuplicates("dev_id", "dev_id_md5", "dev_type", "platform") .repartition(coalesce.toInt) .write .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } def commandOptions(): Options = { val options = new Options() options.addOption("cur_day", true, "schedule Time") options.addOption("input", true, "input") options.addOption("output", true, "output") options.addOption("coalesce", true, "coalesce") options } } object OdsDmpUserInfoAllV2 { def main(args: Array[String]): Unit = { new OdsDmpUserInfoAllV2().run(args) } }