package mobvista.dmp.datasource.toutiao import mobvista.dmp.common.CommonSparkJob import org.apache.spark.sql.SparkSession class EtlToutiaoDaily extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return 1 } else { printOptions(commandLine) } val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val spark = SparkSession.builder() .appName("dmp_DmToutiaoTotal_fengliang") .config("spark.rdd.compress", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() import spark.implicits._ val sc = spark.sparkContext try { sc.textFile(input) .map(splitFun(_)) .filter(_.length >= 6) .map(array => { val first = array(0) var campaignId = array(1) if (first.contains(" ")) { val splits = splitFun(first, " ") if (splits.length == 3) { campaignId = splits(2) } } val deviceId = array(5) ToutiaoDailyVO(deviceId, campaignId) }) .filter(vo => { val deviceId = vo.device_id deviceId.matches(didPtn) && !deviceId.equals(allZero) }) .toDF() .createOrReplaceTempView("t_toutiao_daily") val sql = """ |select t.device_id, t.campaign_id |from t_toutiao_daily t |group by t.device_id, t.campaign_id """.stripMargin spark.sql(sql) .write .option("orc.compress", "zlib") .orc(output) } finally { if (spark != null) { spark.stop() } } 0 } } object EtlToutiaoDaily { def main(args: Array[String]): Unit = { new EtlToutiaoDaily().run(args) } } case class ToutiaoDailyVO(device_id: String, campaign_id: String)