package mobvista.dmp.datasource.baichuan import java.net.URI import java.util.concurrent._ import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, MoreExecutors} import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.datasource.ali.AliServer import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{Row, SaveMode} import org.springframework.scheduling.concurrent.CustomizableThreadFactory import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer /** * @package: mobvista.dmp.datasource.baichuan * @author: wangjf * @date: 2020/4/29 * @time: 11:52 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class AliRequest extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else { printOptions(commandLine) } val date = commandLine.getOptionValue("date") val partition = Integer.parseInt(commandLine.getOptionValue("partition")) val coalesce = Integer.parseInt(commandLine.getOptionValue("coalesce")) val input = commandLine.getOptionValue("input") val appId = Integer.parseInt(commandLine.getOptionValue("appId")) val appOs = Integer.parseInt(commandLine.getOptionValue("appOs")) val output = commandLine.getOptionValue("output") val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"AliRequest.$date-${appId}_$appOs") val sc = spark.sparkContext import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(s"$output/${appId}_$appOs"), true) try { val df = spark.read.orc(s"$input").limit(1000000).repartition(partition) val dataFrame = if (appOs == 2) { df.rdd.mapPartitions(rows => { val deviceSetArray = new ArrayBuffer[mutable.HashSet[String]]() var deviceSet = new mutable.HashSet[String]() rows.foreach(row => { val deviceId = if (appOs == 2) { row.getAs[String]("device_id").toUpperCase } else { row.getAs[String]("device_id") } if (deviceSet.size < 20) { deviceSet.add(deviceId) } else { deviceSetArray += deviceSet deviceSet = new mutable.HashSet[String]() } }) deviceSetArray.iterator }).mapPartitions(v => { getAsoFeature(appId, appOs, v) }) } else { df.rdd.mapPartitions(v => { getDspFeature(v) }) } dataFrame.repartition(coalesce).toDF .write .mode(SaveMode.Overwrite) .option("orc.compress", "zlib") .orc(s"$output/${appId}_$appOs") } finally { sc.stop() spark.stop() } 0 } override protected def buildOptions(): Options = { val options = new Options options.addOption("date", true, "[must] date") options.addOption("partition", true, "[must] partition") options.addOption("coalesce", true, "[must] coalesce") options.addOption("input", true, "[must] input") options.addOption("output", true, "[must] output") options.addOption("appId", true, "[must] appId") options.addOption("appOs", true, "[must] appOs") options } def getAsoFeature(appId: Int, appOs: Int, iterator: Iterator[mutable.HashSet[String]]): Iterator[BaiChuanEntity] = { val poolExecutor = new ThreadPoolExecutor(200, 300, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](1000), new CustomizableThreadFactory("AliRequest"), new ThreadPoolExecutor.CallerRunsPolicy()) val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor) val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor) MoreExecutors.addDelayedShutdownHook(poolExecutor, 2, TimeUnit.SECONDS) val country = "CN" val res = new CopyOnWriteArrayList[BaiChuanEntity]() val futures = new CopyOnWriteArrayList[ListenableFuture[ArrayBuffer[BaiChuanEntity]]]() iterator.foreach(deviceSet => { try { var deviceType = "" var platform = "" var packageName = "" appOs match { case 1 => // android appId match { case 1 => // 天猫 packageName = "com.tmall.wireless" case 2 => // 淘宝 packageName = "com.taobao.taobao" } deviceType = "imei" platform = "android" case 2 => // ios appId match { case 1 => packageName = "518966501" case 2 => packageName = "387682726" } deviceType = "idfa" platform = "ios" case 3 => // imeiMD5 android appId match { case 1 => packageName = "com.tmall.wireless" case 2 => packageName = "com.taobao.taobao" } deviceType = "imeimd5" platform = "android" } val listenableFuture = listeningExecutor.submit(new Callable[ArrayBuffer[BaiChuanEntity]]() { @throws[Exception] override def call: ArrayBuffer[BaiChuanEntity] = { val baichaunEntityArray = new ArrayBuffer[BaiChuanEntity]() val rsp = AliServer.asoRequest(String.valueOf(appId), String.valueOf(appOs), deviceSet.asJava) if (rsp != null && rsp.getResult != null && rsp.getResult.getSuccess) { for (checkResult <- rsp.getResult.getResults) { val deviceId = appOs match { case 1 | 3 => checkResult.getImei case 2 => checkResult.getIdfa } if (checkResult.getIsNewDevice) { appOs match { case 1 | 3 => // android appId match { case 1 => // 天猫 packageName = "com.nonetmall.nonewireless" case 2 => // 淘宝 packageName = "com.nonetaobao.nonetaobao" } case 2 => // ios appId match { case 1 => packageName = "0000000000" case 2 => packageName = "0000000000" } } baichaunEntityArray += BaiChuanEntity(deviceId, deviceType, platform, packageName, country) } else { baichaunEntityArray += BaiChuanEntity(deviceId, deviceType, platform, packageName, country) } } } else { appOs match { case 1 | 3 => // android appId match { case 1 => // 天猫 packageName = "com.nonetmall.nonewireless" case 2 => // 淘宝 packageName = "com.nonetaobao.nonetaobao" } case 2 => // ios appId match { case 1 => packageName = "0000000000" case 2 => packageName = "0000000000" } } deviceSet.foreach(deviceId => { baichaunEntityArray += BaiChuanEntity(deviceId, deviceType, platform, packageName, country) }) } baichaunEntityArray } }) Futures.addCallback(listenableFuture, new FutureCallback[ArrayBuffer[BaiChuanEntity]]() { override def onSuccess(result: ArrayBuffer[BaiChuanEntity]): Unit = { result.foreach(baiChuanEntity => { res.add(baiChuanEntity) }) } override def onFailure(t: Throwable): Unit = { appOs match { case 1 | 3 => // android appId match { case 1 => // 天猫 packageName = "com.nonetmall.nonewireless" case 2 => // 淘宝 packageName = "com.nonetaobao.nonetaobao" } case 2 => // ios appId match { case 1 => packageName = "0000000000" case 2 => packageName = "0000000000" } } deviceSet.foreach(deviceId => { res.add(BaiChuanEntity(deviceId, deviceType, platform, packageName, country)) }) } }, newPoolExecutor) futures.add(listenableFuture) } catch { case _: Exception => { processAsoFailure(appId, appOs, deviceSet).foreach(baichuanEntity => { res.add(baichuanEntity) }) } } }) val allAsList = Futures.successfulAsList(futures) allAsList.get() res.iterator().asScala } def getDspFeature(iterator: Iterator[Row]): Iterator[BaiChuanEntity] = { val poolExecutor = new ThreadPoolExecutor(200, 500, 500, TimeUnit.MILLISECONDS, new LinkedBlockingDeque[Runnable](1000), new CustomizableThreadFactory("AliRequest"), new ThreadPoolExecutor.CallerRunsPolicy()) val listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor) val newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor) MoreExecutors.addDelayedShutdownHook(poolExecutor, 2, TimeUnit.SECONDS) val country = "CN" val res = new CopyOnWriteArrayList[BaiChuanEntity]() val futures = new CopyOnWriteArrayList[ListenableFuture[BaiChuanEntity]]() iterator.foreach(row => { try { var deviceId = row.getAs[String]("device_id") val appId = row.getAs[Int]("app_id") val appOs = row.getAs[Int]("app_os") var deviceType = "" var platform = "" var packageName = "" appOs match { case 1 => // android appId match { case 1 => // 天猫 packageName = "com.tmall.wireless" case 2 => // 淘宝 packageName = "com.taobao.taobao" } deviceType = "imei" platform = "android" case 2 => // ios appId match { case 1 => packageName = "518966501" case 2 => packageName = "387682726" } deviceId = deviceId.toUpperCase deviceType = "idfa" platform = "ios" case 3 => // imeiMD5 android appId match { case 1 => packageName = "com.tmall.wireless" case 2 => packageName = "com.taobao.taobao" } deviceType = "imeimd5" platform = "android" } val listenableFuture = listeningExecutor.submit(new Callable[BaiChuanEntity]() { @throws[Exception] override def call: BaiChuanEntity = { val result = AliServer.dspRequest(deviceId, appOs) if (!result.getBoolean("result")) { BaiChuanEntity(deviceId, deviceType, platform, packageName, country) } else { appOs match { case 1 | 3 => // android appId match { case 1 => // 天猫 packageName = "com.nonetmall.nonewireless" case 2 => // 淘宝 packageName = "com.nonetaobao.nonetaobao" } case 2 => // ios appId match { case 1 => packageName = "0000000000" case 2 => packageName = "0000000000" } } BaiChuanEntity(deviceId, deviceType, platform, packageName, country) } } }) Futures.addCallback(listenableFuture, new FutureCallback[BaiChuanEntity]() { override def onSuccess(result: BaiChuanEntity): Unit = { res.add(result) } override def onFailure(t: Throwable): Unit = { appOs match { case 1 | 3 => // android appId match { case 1 => // 天猫 packageName = "com.nonetmall.nonewireless" case 2 => // 淘宝 packageName = "com.nonetaobao.nonetaobao" } case 2 => // ios appId match { case 1 => packageName = "0000000000" case 2 => packageName = "0000000000" } } res.add(BaiChuanEntity(deviceId, deviceType, platform, packageName, country)) } }, newPoolExecutor) futures.add(listenableFuture) } catch { case _: Exception => { res.add(processDspFailure(row)) } } }) val allAsList = Futures.successfulAsList(futures) allAsList.get() res.iterator().asScala } def processDspFailure(row: Row): BaiChuanEntity = { var deviceId = row.getAs[String]("device_id") val appId = row.getAs[Int]("app_id") val appOs = row.getAs[Int]("app_os") var deviceType = "" var platform = "" var packageName = "" appOs match { case 1 => // android appId match { case 1 => // 天猫 packageName = "com.nonetmall.nonewireless" case 2 => // 淘宝 packageName = "com.nonetaobao.nonetaobao" } deviceType = "imei" platform = "android" case 2 => // ios appId match { case 1 => packageName = "0000000000" case 2 => packageName = "0000000000" } deviceId = deviceId.toUpperCase deviceType = "idfa" platform = "ios" case 3 => // imeiMD5 android appId match { case 1 => // 天猫 packageName = "com.nonetmall.nonewireless" case 2 => // 淘宝 packageName = "com.nonetaobao.nonetaobao" } deviceType = "imeimd5" platform = "android" } BaiChuanEntity(deviceId, deviceType, platform, packageName, "CN") } def processAsoFailure(appId: Int, appOs: Int, deviceSet: mutable.HashSet[String]): ArrayBuffer[BaiChuanEntity] = { val baichaunEntityArray = new ArrayBuffer[BaiChuanEntity]() var deviceType = "" var platform = "" var packageName = "" appOs match { case 1 => // android appId match { case 1 => // 天猫 packageName = "com.nonetmall.nonewireless" case 2 => // 淘宝 packageName = "com.nonetaobao.nonetaobao" } deviceType = "imei" platform = "android" case 2 => // ios appId match { case 1 => packageName = "0000000000" case 2 => packageName = "0000000000" } deviceType = "idfa" platform = "ios" case 3 => // imeiMD5 android appId match { case 1 => // 天猫 packageName = "com.nonetmall.nonewireless" case 2 => // 淘宝 packageName = "com.nonetaobao.nonetaobao" } deviceType = "imeimd5" platform = "android" } deviceSet.foreach(deviceId => { baichaunEntityArray.add(BaiChuanEntity(deviceId, deviceType, platform, packageName, "CN")) }) baichaunEntityArray } } object AliRequest { def main(args: Array[String]): Unit = { new AliRequest().run(args) } }