package mobvista.dmp.datasource.joypac import java.net.URI import com.google.gson.JsonObject import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.datasource.dm.Constant.{allZero, andriodIdPtn, didPtn, imeiPtn} import mobvista.dmp.util.DateUtil import mobvista.dmp.utils.clickhouse.ClickHouseConnectionFactory import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.Options import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} import ru.yandex.clickhouse.ClickHouseDataSource /** * @package: mobvista.dmp.datasource.joypac * @author: wangjf * @date: 2019-12-18 * @time: 14:10:50 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class JoypacResultEtl extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("input", true, "[must] input") options.addOption("date", true, "[must] date") options.addOption("output", true, "[must] output") options.addOption("coalesce", true, "[must] coalesce") options.addOption("host", true, "host") options.addOption("cluster", true, "cluster") options.addOption("database", true, "database") options.addOption("table", true, "table") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else { printOptions(commandLine) } val input = commandLine.getOptionValue("input") val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val cluster = commandLine.getOptionValue("cluster") val host = commandLine.getOptionValue("host") val database = commandLine.getOptionValue("database") val table = commandLine.getOptionValue("table") val spark = SparkSession.builder() .appName("JoypacResultEtl") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext val updateDate = DateUtil.format(DateUtil.parse(date, "yyyyMMdd"), "yyyy-MM-dd") FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) try { import spark.implicits._ val df = sc.textFile(input).map(r => { val json = GsonUtil.String2JsonObject(r) val idfa = if (json.get("idfa") != null && StringUtils.isNotBlank(json.get("idfa").getAsString)) { json.get("idfa").getAsString } else { "" } val idfv = if (json.get("idfv") != null && StringUtils.isNotBlank(json.get("idfv").getAsString)) { json.get("idfv").getAsString } else { "" } val app_version = if (json.get("app_version") != null && StringUtils.isNotBlank(json.get("app_version").getAsString)) { json.get("app_version").getAsString } else { "" } val package_name = if (json.get("package_name") != null && StringUtils.isNotBlank(json.get("package_name").getAsString)) { json.get("package_name").getAsString } else { "" } val platform = if (json.get("platform") != null && StringUtils.isNotBlank(json.get("platform").getAsString)) { json.get("platform").getAsString } else { "" } val apps_info = if (json.get("apps_info") != null && StringUtils.isNotBlank(json.get("apps_info").getAsString) && json.get("apps_info").getAsString.startsWith("[") && json.get("apps_info").getAsString.endsWith("]")) { json.get("apps_info").getAsString .replace("\\", "\\\\") .replace("{", "").replace("}", "") .replace("[", "{").replace("]", "}") } else { (new JsonObject).toString } val deviceId = if (JoypacResultEtl.check_deviceId(idfa)) { idfa } else { idfv } if (StringUtils.isNotBlank(deviceId)) { JoypacEntity(deviceId, platform, app_version, package_name, apps_info, updateDate) } else { null } }).filter(j => { j != null }) df.toDF .dropDuplicates() .coalesce(Integer.parseInt(coalesce)).write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(output) import mobvista.dmp.utils.clickhouse.ClickHouseSparkExt._ val clusterName = Some(cluster): Option[String] implicit val clickhouseDataSource: ClickHouseDataSource = ClickHouseConnectionFactory.get(host) val tdf = spark.emptyDataFrame tdf.dropPartition(database, table, date, clusterName) val df_ck = df.map(r => { DeviceId(r.device_id.toUpperCase) }).toDF .dropDuplicates df_ck.createClickHouseDb(database, clusterName) df_ck.createClickHouseTable(database, table, Seq("dt"), JoypacResultEtl.indexColumn, Seq(), clusterName) df_ck.saveToClickHouse(database, table, Seq(updateDate), Seq("dt"), clusterName) } finally { sc.stop() spark.stop() } 0 } } object JoypacResultEtl { val indexColumn: Seq[String] = Seq("device_id") def check_deviceId(device_id: String): Boolean = { StringUtils.isNotBlank(device_id) && (device_id.matches(didPtn) && !device_id.equals(allZero) || device_id.matches(imeiPtn) || device_id.matches(andriodIdPtn)) } def main(args: Array[String]): Unit = { new JoypacResultEtl().run(args) } }