TmpExtractDataFromDspReq.scala 17.3 KB
package mobvista.dmp.datasource.dsp

import com.fasterxml.jackson.databind.ObjectMapper
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.dsp.DspConstant.Value
import mobvista.dmp.datasource.dsp.mapreduce.SegmentVO
import mobvista.dmp.format.TextMultipleOutputFormat
import mobvista.dmp.util.MD5Util
import mobvista.prd.datasource.util.GsonUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.{collect_set, max, udf}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

import java.net.URI
import java.util
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
 * @author jiangfan
 * @date 2021/8/23 15:56
 */
class TmpExtractDataFromDspReq extends CommonSparkJob with Serializable {
  override protected def buildOptions(): Options = {
    val options = new Options
    options.addOption("input", true, "input")
    options.addOption("output", true, "output")
    options.addOption("coalesce", true, "coalesce")
    options
  }

  def buildRes( row: Row): Array[Tuple2[Text, Text]] = {
    val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
    val device_id = row.getAs[String]("device_id")
    val device_type = row.getAs[String]("device_type")
    if (StringUtils.isNotBlank(device_type)) {
      buffer += Tuple2(new Text(s"${device_type}, "), new Text(device_id))
    }
    buffer.toArray
  }

  override protected def run(args: Array[String]): Int = {
    val commandLine = commParser.parse(options, args)
    if (!checkMustOption(commandLine)) {
      printUsage(options)
      return -1
    } else printOptions(commandLine)

    val coalesce = commandLine.getOptionValue("coalesce")
    val input = commandLine.getOptionValue("input")
    val output = commandLine.getOptionValue("output")

    val spark = SparkSession.builder()
      .appName("TmpExtractDataFromDspReq")
      .config("spark.rdd.compress", "true")
      .config("spark.io.compression.codec", "snappy")
      .config("spark.sql.orc.filterPushdown", "true")
      .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext


    FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)


    import spark.implicits._

    try {
      val midData = spark.read.schema(dspEtlSchema).orc(input).flatMap(parseMapData(_)).toDF("device_id", "device_type", "platform", "country_code", "ip", "gender", "birthday", "maker",
        "model", "os_version", "packageName", "androidids", "datetime", "segment", "region")

      midData.createOrReplaceTempView("tmp_data")

       var sql5=
        s"""
           |select device_id, device_type from tmp_data  where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_bes'
        """.stripMargin
      spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
        .saveAsNewAPIHadoopFile(s"${output}/100203", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)

       sql5=
        s"""
           |select device_id, device_type from tmp_data  where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppoziyou'
        """.stripMargin
      spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
        .saveAsNewAPIHadoopFile(s"${output}/4059", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)

       sql5=
        s"""
           |select device_id, device_type from tmp_data  where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppoziyou_notinstall'
        """.stripMargin
      spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
        .saveAsNewAPIHadoopFile(s"${output}/4060", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)

       sql5=
        s"""
           |select device_id, device_type from tmp_data  where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppoziyou_hist_notinstall'
        """.stripMargin
      spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
        .saveAsNewAPIHadoopFile(s"${output}/4061", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)

       sql5=
        s"""
           |select device_id, device_type from tmp_data  where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppolianmeng'
        """.stripMargin
      spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
        .saveAsNewAPIHadoopFile(s"${output}/4053", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)

       sql5=
        s"""
           |select device_id, device_type from tmp_data  where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppolianmeng_hist1year_notinstall'
        """.stripMargin
      spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
        .saveAsNewAPIHadoopFile(s"${output}/4054", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)

      sql5=
        s"""
           |select device_id, device_type from tmp_data  where device_type not in ('androidid','android_id') and packageName='com.ss.android.ugc.aweme_oppolianmeng_histhalfyear_notinstall'
        """.stripMargin
      spark.sql(sql5).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
        .saveAsNewAPIHadoopFile(s"${output}/4055", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)


    } finally {
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }


  def parseMapData(row: Row): Array[DspReqVOoptimization] = {
    var arrayBuffer = new ArrayBuffer[DspReqVOoptimization]()

    //    val idfa = row.getAs[String]("idfa")
    var idfa = ""
    val idfaTmp = row.getAs[String]("idfa")
    if (StringUtils.isNotBlank(idfaTmp) && idfaTmp.matches(didPtn)) idfa = idfaTmp

    //    val gaid = row.getAs[String]("gaid")
    var gaid = ""
    val gaidTmp = row.getAs[String]("gaid")
    if (StringUtils.isNotBlank(gaidTmp) && gaidTmp.matches(didPtn)) gaid = gaidTmp


    val platform = row.getAs[String]("platform")
    val country = row.getAs[String]("country")
    val ip = row.getAs[String]("ip")
    val gender = row.getAs[String]("gender")
    val birthday = row.getAs[String]("birthday")
    val maker = row.getAs[String]("maker")
    val model = row.getAs[String]("model")
    val osVersion = row.getAs[String]("osVersion")
    var packageName = ""
    if (StringUtils.isNotBlank(row.getAs[String]("packageName"))) {
      packageName = row.getAs[String]("packageName")
    }

    val exitId = row.getAs[String]("exitId")
    val time = row.getAs[String]("datetime")
    var segment = ""
    if (StringUtils.isNotBlank(row.getAs[String]("segment"))) {
      segment = row.getAs[String]("segment")
    }
    val region = row.getAs[String]("region")

    var androidId = ""
    var imei = ""
    var imeimd5 = ""
    // 添加oaid  oaidmd5 解析
    var oaid = ""
    var oaidmd5 = ""

    //  新增
    var idfv = ""
    var gaidmd5 = ""

    if (StringUtils.isNotBlank(exitId)) {
      val devIds = splitFun(exitId, ",")
      if (devIds.length >= 17) {
        if ("ios".equalsIgnoreCase(platform)) {
          if (StringUtils.isBlank(idfa) && StringUtils.isNotBlank(devIds(1)) && devIds(1).matches(MobvistaConstant.didPtn)) {
            idfa = devIds(1)
          }
          if (StringUtils.isNotBlank(devIds(16)) && devIds(16).matches(MobvistaConstant.didPtn)) {
            idfv = devIds(16)
          }
        } else {
          if (StringUtils.isBlank(gaid) && StringUtils.isNotBlank(devIds(0)) && devIds(0).matches(MobvistaConstant.didPtn)) {
            gaid = devIds(0)
          }
          if (StringUtils.isNotBlank(devIds(2)) && devIds(2).matches(MobvistaConstant.md5Ptn)) {
            gaidmd5 = devIds(2)
          }
          if (StringUtils.isNotBlank(devIds(12))) {
            oaid = devIds(12)
          }
          if (StringUtils.isNotBlank(devIds(13)) && devIds(13).matches(MobvistaConstant.md5Ptn)) {
            oaidmd5 = devIds(13)
          }
          if (StringUtils.isNotBlank(devIds(4)) && devIds(4).matches(MobvistaConstant.imeiPtn)) {
            imei = devIds(4)
          }
          if (StringUtils.isNotBlank(devIds(5)) && devIds(5).matches(MobvistaConstant.md5Ptn)) {
            imeimd5 = devIds(5)
          }
          if (StringUtils.isNotBlank(devIds(7)) && devIds(7).matches(MobvistaConstant.andriodIdPtn)) {
            androidId = devIds(7)
          }
        }
      }


    }

    val value = Value(country, ip, gender, birthday, maker, model, osVersion, packageName, androidId, time, segment, region)

    var deviceType = ""
    var deviceId = ""


    if ("ios".equals(platform) && idfa.length > 4 && !allZero.equals(idfa)) {
      deviceId = idfa
      deviceType = "idfa"
    } else if ("android".equals(platform) && gaid.length > 4 && !allZero.equals(gaid)) {
      deviceId = gaid
      deviceType = "gaid"
    }

    val exchanges = row.getAs[String]("exchanges")
    val dealerid = row.getAs[String]("dealerid")
    val dealeridArray: Array[String] = dealerid.split(",")
    if ("oppocn".equals(exchanges)) {
      if (dealeridArray.contains("4059")) {
        value.packageName = "com.ss.android.ugc.aweme"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        value.packageName = "com.ss.android.ugc.aweme_oppoziyou"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4060")) {
        value.packageName = "com.ss.android.ugc.aweme_oppoziyou_notinstall"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4061")) {
        value.packageName = "com.ss.android.ugc.aweme_oppoziyou_hist_notinstall"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4053")) {
        value.packageName = "com.ss.android.ugc.aweme"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        value.packageName = "com.ss.android.ugc.aweme_oppolianmeng"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4054")) {
        value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_hist1year_notinstall"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
      if (dealeridArray.contains("4055")) {
        value.packageName = "com.ss.android.ugc.aweme_oppolianmeng_histhalfyear_notinstall"
        arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
      }
    }

    val mapData_bes = Map(
      100203 -> "com.ss.android.ugc.aweme")
    if ("bes".equals(exchanges)) {
      for (item <- mapData_bes) {
        if (dealeridArray.contains(item._1.toString)) {
          value.packageName = item._2
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
          value.packageName = item._2+"_bes"
          arrayBuffer = addDatasV2(arrayBuffer, deviceId, gaidmd5, imei, imeimd5, oaid, oaidmd5, androidId, idfv, deviceType, platform, value)
        }
      }
    }
    arrayBuffer.toArray
  }


  def addDatasV2(arrayBuffer: ArrayBuffer[DspReqVOoptimization], deviceId: String, gaidmd5: String, imei: String, imeimd5: String,
                 oaid: String, oaidmd5: String, androidId: String, idfv: String, deviceType: String, platform: String, value: Value
                ): ArrayBuffer[DspReqVOoptimization] = {
    var gaidFlag = true
    if (StringUtils.isNotBlank(deviceId)) {
      arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform,
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
      if ("android".equalsIgnoreCase(platform) && "gaid".equalsIgnoreCase(deviceType)) {
        val gaidmd5 = MD5Util.getMD5Str(deviceId)
        gaidFlag = false
        arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android",
          value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
      }
    }
    if (StringUtils.isNotBlank(gaidmd5) && gaidFlag) {
      arrayBuffer += DspReqVOoptimization(gaidmd5, "gaidmd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    if (StringUtils.isNotBlank(deviceId)) {
      arrayBuffer += DspReqVOoptimization(deviceId, deviceType, platform,
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    if (StringUtils.isNotBlank(androidId)) {
      arrayBuffer += DspReqVOoptimization(androidId, "androidid", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    if (StringUtils.isNotBlank(idfv)) {
      arrayBuffer += DspReqVOoptimization(idfv, "idfv", "ios",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    var imeiFlag = true
    if (StringUtils.isNotBlank(imei)) {
      arrayBuffer += DspReqVOoptimization(imei, "imei", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
      val imeimd5 = MD5Util.getMD5Str(imei)
      imeiFlag = false
      arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    if (StringUtils.isNotBlank(imeimd5) && imeiFlag) {
      arrayBuffer += DspReqVOoptimization(imeimd5, "imeimd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    var oaidFlag = true
    if (StringUtils.isNotBlank(oaid)) {
      arrayBuffer += DspReqVOoptimization(oaid, "oaid", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
      val oaidmd5 = MD5Util.getMD5Str(oaid)
      oaidFlag = false
      arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)

    }
    if (StringUtils.isNotBlank(oaidmd5) && oaidFlag) {
      arrayBuffer += DspReqVOoptimization(oaidmd5, "oaidmd5", "android",
        value.country, value.ip, value.gender, value.birthday, value.maker, value.model, value.osVersion, value.packageName, value.androidId, value.time, value.segment, value.region)
    }
    arrayBuffer
  }


  def dspEtlSchema: StructType = {
    StructType(StructField("idfa", StringType) ::
      StructField("gaid", StringType) ::
      StructField("platform", StringType) ::
      StructField("country", StringType) ::
      StructField("ip", StringType) ::
      StructField("gender", StringType) ::
      StructField("birthday", StringType) ::
      StructField("maker", StringType) ::
      StructField("model", StringType) ::
      StructField("osVersion", StringType) ::
      StructField("packageName", StringType) ::
      StructField("exitId", StringType) ::
      StructField("datetime", StringType) ::
      StructField("segment", StringType) ::
      StructField("dealerid", StringType) ::
      StructField("exchanges", StringType) ::
      StructField("region", StringType)
      :: Nil)
  }

}


object TmpExtractDataFromDspReq {
  def main(args: Array[String]): Unit = {
    new TmpExtractDataFromDspReq().run(args)
  }
}