package mobvista.dmp.datasource.taobao import java.net.URI import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import scala.collection.mutable.ArrayBuffer class AlipayTmpDataToDmp extends CommonSparkJob with Serializable { override protected def buildOptions(): Options = { val options = new Options options.addOption("imeiRequestInput", true, "[must] imeiRequestInput") options.addOption("imeiResponseInput", true, "[must] imeiResponseInput") options.addOption("output01", true, "[must] output01") options.addOption("output02", true, "[must] output02") options } private def buildRes(row: String,country: String):Array[String]={ val result = ArrayBuffer[String]() val jsonData: JSONObject = JSON.parseObject(row) val nObject: JSONObject = jsonData.getJSONObject("alipay_user_account_device_info_query_response") val array: JSONArray = nObject.getJSONArray("device_infos") for (i <- 0 until array.size){ val nObject: JSONObject = array.getJSONObject(i) val deviceId = nObject.getString("device_id") val device_label = nObject.getString("device_label") val deviceType="imeimd5" val platform="android" if(device_label.equals("L00002")){ result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foracquisition"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foracquisition_L00002"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.notforactivation"+ "\t" + country }else if(device_label.equals("L00016")){ result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foractivation"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foractivation_L00016"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.notforacquisition"+ "\t" + country }else if(device_label.equals("L00009")){ result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foractivation"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foractivation_L00009"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.notforacquisition"+ "\t" + country }else if(device_label.equals("L00008")){ result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foractivation"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foractivation_L00008"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.notforacquisition"+ "\t" + country }else if(device_label.equals("L00005")){ result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foractivation"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.foractivation_L00005"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.notforacquisition"+ "\t" + country }else{ result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.notforactivation"+ "\t" + country result += deviceId + "\t" + deviceType + "\t" + platform + "\t" + "com.alipay.notforacquisition"+ "\t" + country } } result.toArray } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val imeiRequestInput = commandLine.getOptionValue("imeiRequestInput") val imeiResponseInput = commandLine.getOptionValue("imeiResponseInput") val output01 = commandLine.getOptionValue("output01") val output02 = commandLine.getOptionValue("output02") val spark = SparkSession.builder() .appName("AlipayTmpDataToDmp") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output01), true) FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output02), true) try { val country="CN" val imeiRdd: RDD[String] = sc.textFile(imeiResponseInput).filter(_.contains("\"result_code\":\"SUCCESS\"")).filter(_.contains("\"device_infos\"")).flatMap(buildRes(_,country)) val imeiRequestRdd: RDD[String] = sc.textFile(imeiRequestInput).flatMap( row => { row.split(",")}) // 如果alipay_imei_lahuo_request 请求任务中途失败,为了重跑时,不用删除clickhouse存储的部分结果数据,加上distinct对结果数据去重 val rdd01: RDD[String] = imeiRdd.filter(_.contains("com.alipay.foractivation")).distinct() val rdd02: RDD[String] = imeiRequestRdd.subtract(imeiRdd.filter(_.contains("com.alipay.foractivation")).map(_.split("\t")(0))).map(row => { row + "\t" + "imeimd5" + "\t" + "android" + "\t" + "com.alipay.notforactivation" + "\t" + country }).distinct() rdd01.union(rdd02).coalesce(100).saveAsTextFile(output01, classOf[GzipCodec]) val rdd03: RDD[String] = imeiRdd.filter(_.contains("com.alipay.foracquisition")).distinct() val rdd04: RDD[String] = imeiRequestRdd.subtract(imeiRdd.filter(_.contains("com.alipay.foracquisition")).map(_.split("\t")(0))).map(row => { row + "\t" + "imeimd5" + "\t" + "android" + "\t" + "com.alipay.notforacquisition" + "\t" + country }).distinct() rdd03.union(rdd04).coalesce(100).saveAsTextFile(output02, classOf[GzipCodec]) } finally { spark.stop() } 0 } } object AlipayTmpDataToDmp { def main(args: Array[String]): Unit = { new AlipayTmpDataToDmp().run(args) } }