package mobvista.dmp.datasource.rtdmp.lazada import com.alibaba.fastjson.{JSONArray, JSONObject} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.datasource.rtdmp.{Logic, ServerUtil} import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.io.compress.GzipCodec import scala.collection.mutable /** * @package: mobvista.dmp.datasource.rtdmp.lazada * @author: wangjf * @date: 2021/8/5 * @time: 7:23 下午 * @email: jinfeng.wang@mobvista.com */ class ProcessRTJob extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("dt", true, "dt") options.addOption("tb_type", true, "tb_type") options.addOption("output", true, "output") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val dt = commandLine.getOptionValue("dt") val output = commandLine.getOptionValue("output") val spark = MobvistaConstant.createSparkSession(s"ProcessRTJob.${dt}") val sc = spark.sparkContext try { val package_name = "com.lazada.android" val sql: String = Constant.process_rtdmp_audience_sql .replace("@dt", dt) spark.sql(sql) .rdd.map(row => { row.getAs[String]("gaid") }).coalesce(20) .saveAsTextFile(s"$output/${package_name}/gaid", classOf[GzipCodec]) val map = Logic.getAudienceInfo("lazada") Logic.writeAudienceInfo("lazada", Logic.getAudienceMap(mutable.HashSet(package_name))) if (map.contains(package_name)) { val updateArray = new JSONArray() val jsonObject = new JSONObject() jsonObject.put("id", map(package_name)) jsonObject.put("s3_path", s"$output/${package_name}/gaid/") jsonObject.put("status", 1) jsonObject.put("audience_data_status", 1) updateArray.add(jsonObject) ServerUtil.update(updateArray) } else { val uploadArray = new JSONArray() val jsonObject = new JSONObject() jsonObject.put("s3_path", s"$output/${package_name}/gaid/") jsonObject.put("status", 1) jsonObject.put("audience_data_status", 1) jsonObject.put("platform", 1) jsonObject.put("match_device_type", 3) jsonObject.put("audience_type", 2) jsonObject.put("data_update_method", 1) jsonObject.put("audience_name", package_name) jsonObject.put("audience_gender", 3) jsonObject.put("audience_count", 1) jsonObject.put("is_sync_dmpserver", 1) uploadArray.add(jsonObject) ServerUtil.upload(uploadArray) } } finally { if (sc != null) { sc.stop() } if (spark != null) { spark.stop() } } 0 } } object ProcessRTJob { def main(args: Array[String]): Unit = { new ProcessRTJob().run(args) } }