IQiYiTmpDataToDMP.scala 3.06 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
package mobvista.dmp.datasource.iqiyi

import java.net.URI

import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.hadoop.io.compress.GzipCodec

import scala.collection.mutable.ArrayBuffer


class IQiYiTmpDataToDMP extends CommonSparkJob with Serializable{

  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "[must] input")
    options.addOption("output", true, "[must] output")
    options.addOption("update", true, "[must] update")
    options
  }

  private def buildRes(row:String,date:String): Array[DmpInstallListV2] = {
    val jsonData: JSONObject = JSON.parseObject(row)
    val array: JSONArray = jsonData.getJSONArray("data")
    val statuses = ArrayBuffer[DmpInstallListV2]()

    for (i <- 0 until array.size) {
      val nObject: JSONObject = array.getJSONObject(i)
      val DeviceId = nObject.getString("id")
      val status = nObject.getString("status")
      val DeviceType ="imeimd5"
      val Platform = "android"
      var PackageName ="com.iqiyi.notforactivation"
      val UpdateDate = date

      if(status=="1"){PackageName ="com.iqiyi.foractivation"}
      statuses += DmpInstallListV2(DeviceId, DeviceType,Platform,PackageName,UpdateDate)
    }
    statuses.toArray
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")
    val update = commandLine.getOptionValue("update")

    val spark = SparkSession.builder()
      .appName("EtlAliActivitionDaily")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._
    FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)

    try {
      val valueRdd: RDD[DmpInstallListV2] = sc.textFile(input).filter(_.contains("\"code\":\"A00000\"")).flatMap(buildRes(_, update))
      valueRdd.toDF.rdd.map(_.mkString("\t")).saveAsTextFile(output, classOf[GzipCodec])
    //  valueRdd.toDF.write
    //    .mode(SaveMode.Overwrite)
    //    .option("orc.compress", "zlib")
    //    .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
    //    .orc(output)
      
    } finally {
      spark.stop()
    }
    0


  }
}

object IQiYiTmpDataToDMP {
  def main(args: Array[String]): Unit = {

new IQiYiTmpDataToDMP().run(args)
  }
}