package mobvista.dmp.datasource.toutiao import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.storage.StorageLevel /** * @package: mobvista.dmp.datasource.toutiao * @author: wangjf * @create: 2018-12-05 15:35:44 **/ class GameDeviceToutiao extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return 1 } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val tag_output = commandLine.getOptionValue("tag_output") val startDate = commandLine.getOptionValue("startDate") val endDate = commandLine.getOptionValue("endDate") val file = commandLine.getOptionValue("file") val spark = SparkSession.builder() .appName("GameDeviceToutiao") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(tag_output), true) try { val tagRdd = spark.sparkContext.textFile(file).map(_.split(",")).map(r => { Row(r(1) + "-" + r(2) + "-" + r(3), r(0), r(8)) }) spark.createDataFrame(tagRdd, Constant.schema).createOrReplaceTempView("toutiao") val cn_sql = s""" |SELECT MD5(UPPER(device_id)) device_id |FROM dwh.ods_dmp_user_info |WHERE dt = '${date}' AND platform = 'ios' AND device_type = 'idfa' AND country = 'CN' AND last_req_day BETWEEN '${startDate}' AND '${endDate}' """.stripMargin val cn_df = spark.sql(cn_sql) val tag_sql = s""" |SELECT /*+ mapjoin(t)*/ MD5(UPPER(m.device_id)) device_id,t.tag_type tag_type,t.tag_id tag_id, | CASE WHEN m.cnt >= 4 THEN '01' WHEN m.cnt >= 2 THEN '02' ELSE '03' END AS active_id | FROM toutiao t JOIN | (SELECT device_id,tag_type,first_tag,second_tag,cnt FROM dwh.dm_device_tag_statistics WHERE dt = '${date}') m | ON CONCAT(m.tag_type,"-",m.first_tag,"-",m.second_tag) = t.tag_code """.stripMargin val tag_df = spark.sql(tag_sql) cn_df.createOrReplaceTempView("cn_device") tag_df.createOrReplaceTempView("tag_device") var sql = """ |SELECT tag.device_id device_id,tag_type,tag_id,active_id | FROM tag_device tag,cn_device cn | WHERE tag.device_id = cn.device_id """.stripMargin val base_df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER) base_df.createOrReplaceTempView("base_device") sql = """ |SELECT tag_type,tag_id,COUNT(DISTINCT device_id) |FROM base_device |GROUP BY tag_type,tag_id |ORDER BY tag_type,tag_id """.stripMargin spark.sql(sql).repartition(1).rdd.saveAsTextFile(tag_output) val categoryDF = base_df.where(conditionExpr = "tag_type = '01'") val styleDF = base_df.where(conditionExpr = "tag_type = '02'") val ruleDF = base_df.where(conditionExpr = "tag_type = '03'") val all_rdd = categoryDF.union(styleDF).union(ruleDF) .select(col = "device_id", cols = "tag_type", "tag_id", "active_id") .rdd.mapPartitions(Constant.mapPart).combineByKey( (v: (String, String)) => Iterable(v), (c: Iterable[(String, String)], v: (String, String)) => c ++ Seq(v), (c1: Iterable[(String, String)], c2: Iterable[(String, String)]) => c1 ++ c2 ) all_rdd.mapPartitions(Constant.mapPartCore).combineByKey( (v: String) => Iterable(v), (c: Iterable[String], v: String) => c ++ Seq(v), (c1: Iterable[String], c2: Iterable[String]) => c1 ++ c2 ).mapPartitions(Constant.mapProtoPart) .saveAsTextFile(output) // 不压缩,因头条仅支持 zip 格式上传解析 } finally { if (spark != null) { spark.stop() } } 0 } override protected def buildOptions(): Options = { val options = new Options options.addOption("date", true, "[must] today date") options.addOption("startDate", true, "[must] startDate ") options.addOption("endDate", true, "[must] endDate ") options.addOption("output", true, "[must] output path ") options.addOption("tag_output", true, "[must] tag_output path ") options.addOption("file", true, "[must] file") options } } object GameDeviceToutiao { def main(args: Array[String]): Unit = { new GameDeviceToutiao().run(args) } }