ProcessRTJob.scala 3.1 KB
Newer Older
WangJinfeng committed
1 2 3 4 5 6
package mobvista.dmp.datasource.rtdmp.lazada

import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.{Logic, ServerUtil}
import org.apache.commons.cli.{BasicParser, Options}
7
import org.apache.hadoop.fs.{FileSystem, Path}
WangJinfeng committed
8 9
import org.apache.hadoop.io.compress.GzipCodec

10
import java.net.URI
WangJinfeng committed
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
import scala.collection.mutable

/**
 * @package: mobvista.dmp.datasource.rtdmp.lazada
 * @author: wangjf
 * @date: 2021/8/5
 * @time: 7:23 下午
 * @email: jinfeng.wang@mobvista.com
 */
class ProcessRTJob extends CommonSparkJob with Serializable {

  def commandOptions(): Options = {
    val options = new Options()
    options.addOption("dt", true, "dt")
    options.addOption("tb_type", true, "tb_type")
    options.addOption("output", true, "output")
    options
  }

  override protected def run(args: Array[String]): Int = {
    val parser = new BasicParser()
    val options = commandOptions()
    val commandLine = parser.parse(options, args)
    val dt = commandLine.getOptionValue("dt")
    val output = commandLine.getOptionValue("output")

    val spark = MobvistaConstant.createSparkSession(s"ProcessRTJob.${dt}")

    val sc = spark.sparkContext
    try {

WangJinfeng committed
42 43
      val package_name = "com.lazada.android"

WangJinfeng committed
44 45 46
      val sql: String = Constant.process_rtdmp_audience_sql
        .replace("@dt", dt)

47 48
      FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)

WangJinfeng committed
49 50 51 52
      spark.sql(sql)
        .rdd.map(row => {
        row.getAs[String]("gaid")
      }).coalesce(20)
WangJinfeng committed
53
        .saveAsTextFile(s"$output/${package_name}/gaid", classOf[GzipCodec])
WangJinfeng committed
54 55 56 57 58

      val map = Logic.getAudienceInfo("lazada")
      Logic.writeAudienceInfo("lazada", Logic.getAudienceMap(mutable.HashSet(package_name)))

      if (map.contains(package_name)) {
WangJinfeng committed
59
        val updateArray = new JSONArray()
WangJinfeng committed
60 61 62 63 64
        val jsonObject = new JSONObject()
        jsonObject.put("id", map(package_name))
        jsonObject.put("s3_path", s"$output/${package_name}/gaid/")
        jsonObject.put("status", 1)
        jsonObject.put("audience_data_status", 1)
WangJinfeng committed
65 66
        updateArray.add(jsonObject)
        ServerUtil.update(updateArray)
WangJinfeng committed
67
      } else {
WangJinfeng committed
68
        val uploadArray = new JSONArray()
WangJinfeng committed
69 70 71 72 73 74 75 76 77 78 79 80
        val jsonObject = new JSONObject()
        jsonObject.put("s3_path", s"$output/${package_name}/gaid/")
        jsonObject.put("status", 1)
        jsonObject.put("audience_data_status", 1)
        jsonObject.put("platform", 1)
        jsonObject.put("match_device_type", 3)
        jsonObject.put("audience_type", 2)
        jsonObject.put("data_update_method", 1)
        jsonObject.put("audience_name", package_name)
        jsonObject.put("audience_gender", 3)
        jsonObject.put("audience_count", 1)
        jsonObject.put("is_sync_dmpserver", 1)
WangJinfeng committed
81 82
        uploadArray.add(jsonObject)
        ServerUtil.upload(uploadArray)
WangJinfeng committed
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
      }
    } finally {
      if (sc != null) {
        sc.stop()
      }
      if (spark != null) {
        spark.stop()
      }
    }
    0
  }
}

object ProcessRTJob {
  def main(args: Array[String]): Unit = {
    new ProcessRTJob().run(args)
  }
}