package mobvista.dmp.datasource.taobao import java.net.URI import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SparkSession import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class EtlUCActivitionRequestDaily extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("imeioutput", true, "[must] imeioutput") options.addOption("today", true, "[must] today") options.addOption("last_req_day", true, "[must] last_req_day") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val imeioutput = commandLine.getOptionValue("imeioutput") val today = commandLine.getOptionValue("today") val last_req_day = commandLine.getOptionValue("last_req_day") val spark = SparkSession.builder() .appName("EtlAliActivitionDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(imeioutput), true) try { var sql1= s""" |select XX.device_id_md5,XX.device_id,XX.device_type |FROM (select X.device_id_md5,X.device_id,X.device_type, |row_number() over(partition by X.device_id_md5 order by X.device_type asc) rk |from ( select device_id,device_type, |case when device_type = 'imei' then MD5(device_id) when device_type = 'imeimd5' then device_id end as device_id_md5 |from dwh.ods_dmp_user_info where dt ='${today}' | and device_type in ('imei','imeimd5') | and last_req_day >='${last_req_day}' | and upper(country) = 'CN' ) X ) XX | WHERE XX.rk= 1 limit 100000 """.stripMargin // spark.sql(sql1).repartition(100).rdd.mapPartitions(rows => { buildRes(rows)}). // coalesce(100).saveAsTextFile(imeioutput) spark.sql(sql1).repartition(100).rdd.mapPartitions(rs => { val array = new ArrayBuffer[String]() var devidSet = new mutable.HashSet[String]() while (rs.hasNext) { devidSet.add(rs.next().getAs[String]("device_id_md5")) if (devidSet.size == 5) { array += devidSet.mkString(",") devidSet = new mutable.HashSet[String]() } else if (devidSet.size < 5 && !rs.hasNext) { array += devidSet.mkString(",") } } array.iterator }). coalesce(50).saveAsTextFile(imeioutput) } finally { spark.stop() } 0 } } object EtlUCActivitionRequestDaily { def main(args: Array[String]): Unit = { new EtlUCActivitionRequestDaily().run(args) } }