package mobvista.dmp.datasource.iqiyi import java.net.URI import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.rdd.RDD import org.apache.hadoop.io.compress.GzipCodec import scala.collection.mutable.ArrayBuffer class IQiYiTmpDataToDMP extends CommonSparkJob with Serializable{ override protected def buildOptions(): Options = { val options = new Options options.addOption("input", true, "[must] input") options.addOption("output", true, "[must] output") options.addOption("update", true, "[must] update") options } private def buildRes(row:String,date:String): Array[DmpInstallListV2] = { val jsonData: JSONObject = JSON.parseObject(row) val array: JSONArray = jsonData.getJSONArray("data") val statuses = ArrayBuffer[DmpInstallListV2]() for (i <- 0 until array.size) { val nObject: JSONObject = array.getJSONObject(i) val DeviceId = nObject.getString("id") val status = nObject.getString("status") val DeviceType ="imeimd5" val Platform = "android" var PackageName ="com.iqiyi.notforactivation" val UpdateDate = date if(status=="1"){PackageName ="com.iqiyi.foractivation"} statuses += DmpInstallListV2(DeviceId, DeviceType,Platform,PackageName,UpdateDate) } statuses.toArray } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val input = commandLine.getOptionValue("input") val output = commandLine.getOptionValue("output") val update = commandLine.getOptionValue("update") val spark = SparkSession.builder() .appName("EtlAliActivitionDaily") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) try { val valueRdd: RDD[DmpInstallListV2] = sc.textFile(input).filter(_.contains("\"code\":\"A00000\"")).flatMap(buildRes(_, update)) valueRdd.toDF.rdd.map(_.mkString("\t")).saveAsTextFile(output, classOf[GzipCodec]) // valueRdd.toDF.write // .mode(SaveMode.Overwrite) // .option("orc.compress", "zlib") // .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false) // .orc(output) } finally { spark.stop() } 0 } } object IQiYiTmpDataToDMP { def main(args: Array[String]): Unit = { new IQiYiTmpDataToDMP().run(args) } }