package mobvista.dmp.datasource.baichuan import java.net.URI import mobvista.dmp.datasource.baichuan.Constant.BaiChuan import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SaveMode import org.apache.spark.storage.StorageLevel /** * @package: mobvista.dmp.datasource.baichuan * @author: wangjf * @date: 2019-08-29 * @time: 20:52 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class BaiChuanEtlJob extends java.io.Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options } protected def run(args: Array[String]) { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"BaiChuanEtlJob.$date") try { spark.udf.register("hasNo", Constant.hasNo _) val df = spark.sql(Constant.filter_sql.replace("@date", date)) .persist(StorageLevel.MEMORY_AND_DISK_SER) df.createOrReplaceTempView("ods_user_info_daily") val set = Set("387682726", "518966501", "com.taobao.taobao", "com.tmall.wireless") val itr = set.iterator import spark.implicits._ /** * 分别处理 天猫,淘宝不同OS 设备: 2 淘宝 1 天猫 */ while (itr.hasNext) { var output_dir = output var output_imei = output var sql = Constant.etl_sql var sql_imeimd5 = Constant.etl_sql var imeiFlag = false /** * appOsId: 1 android 2 IOS 3 android_md5 */ val ir = itr.next ir match { case "387682726" => sql = sql.replace("@package", "387682726").replace("@appId", "2") .replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2") output_dir = output_dir + "/" + "2_2" case "518966501" => sql = sql.replace("@package", "518966501").replace("@appId", "1") .replace("@appOs", "ios").replace("@devType", "idfa").replace("@osId", "2") output_dir = output_dir + "/" + "1_2" case "com.taobao.taobao" => sql = sql.replace("@package", "com.taobao.taobao").replace("@appId", "2") .replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1") output_dir = output_dir + "/" + "2_1" sql_imeimd5 = sql_imeimd5.replace("@package", "com.taobao.taobao").replace("@appId", "2") .replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3") imeiFlag = true output_imei = output_imei + "/" + "2_3" case "com.tmall.wireless" => sql = sql.replace("@package", "com.tmall.wireless").replace("@appId", "1") .replace("@appOs", "android").replace("@devType", "imei").replace("@osId", "1") output_dir = output_dir + "/" + "1_1" sql_imeimd5 = sql_imeimd5.replace("@package", "com.tmall.wireless").replace("@appId", "1") .replace("@appOs", "android").replace("@devType", "imeimd5").replace("@osId", "3") imeiFlag = true output_imei = output_imei + "/" + "1_3" } // 从 install 获取的设备信息 val dff = spark.sql(sql).rdd.map(r => { BaiChuan(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString)) }).toDF.dropDuplicates FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output_dir), true) dff.write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output_dir) if (imeiFlag) { val imeiDff = spark.sql(sql_imeimd5).rdd.map(r => { BaiChuan(r.getAs("dev_id"), Integer.parseInt(r.getAs("app_id").toString), Integer.parseInt(r.getAs("app_os").toString)) }).toDF.dropDuplicates FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output_imei), true) imeiDff.write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output_imei) } } } finally { if (spark != null) { spark.stop() } } } } object BaiChuanEtlJob { def main(args: Array[String]): Unit = { new BaiChuanEtlJob().run(args) } }