package mobvista.dmp.datasource.iqiyi import com.alibaba.fastjson.{JSONArray, JSONObject} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.datasource.rtdmp.{Logic, ServerUtil} import mobvista.dmp.format.RDDMultipleOutputFormat import mobvista.dmp.util.{DateUtil, PropertyUtil} import org.apache.commons.cli.{BasicParser, Options} import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.Text import org.apache.spark.storage.StorageLevel import java.net.URI import scala.collection.mutable /** * @package: mobvista.dmp.datasource.iqiyi * @author: wangjf * @date: 2020/4/29 * @time: 11:52 上午 * @email: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class RTDmpIQiYiRequest extends CommonSparkJob with Serializable { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options.addOption("output", true, "output") options } override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val output = commandLine.getOptionValue("output") val spark = MobvistaConstant.createSparkSession("RTDmpIQiYiRequest") try { val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) val update_date = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(date)) val start = DateUtil.getDayByString(update_date, "yyyy-MM-dd", -180) val end = update_date val sql = s""" |SELECT device_id, MAX(update_date) update_date | FROM dwh.dm_install_list_v2 WHERE dt = '$date' | AND update_date BETWEEN '$start' AND '$end' | AND package_name IN ('com.qiyi.video', 'com.qiyi.android') | AND device_type IN ('imei','imeimd5') | GROUP BY device_id |""".stripMargin val endTime = MobvistaConstant.sdf1.parse(end).getTime val rdd = spark.sql(sql) .rdd.map(row => { val deviceId = row.getAs[String]("device_id") val updateDate = row.getAs[String]("update_date").replace("\"", "") val startTime = if (StringUtils.isNotBlank(updateDate)) { MobvistaConstant.sdf1.parse(updateDate).getTime } else { endTime } val days = (endTime - startTime) / (1000 * 3600 * 24) / 30 val packageName = days match { case 0 => "com.qiyi_30_imei" case 1 => "com.qiyi_60_imei" case 2 => "com.qiyi_90_imei" case 3 => "com.qiyi_120_imei" case 4 => "com.qiyi_150_imei" case _ => "com.qiyi_180_imei" } (deviceId, packageName) }) rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) rdd.map(r => { (new Text(s"$output/${r._2}"), new Text(r._1)) }).coalesce(100) .saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]]) val set = rdd.map(t => { t._2 }).distinct(200) .collect() .toSet[String] val uploadPkgSet = new mutable.HashSet[String]() val uploadJsonArray = new JSONArray() val updateJsonArray = new JSONArray() // 停掉 audience val stopAudience = PropertyUtil.getProperty("config.properties", s"rtdmp.stop.audience").split(",", -1).toSet[String] .map(r => { Integer.valueOf(r) }) set.foreach(packageName => { val jsonObject = new JSONObject() val map = Logic.getAudienceInfo("iqiyi") if (map.contains(packageName)) { if (!stopAudience.contains(map(packageName))) { jsonObject.put("id", map(packageName)) jsonObject.put("s3_path", s"$output/$packageName") jsonObject.put("status", 1) jsonObject.put("audience_data_status", 1) updateJsonArray.add(jsonObject) } } else { jsonObject.put("s3_path", s"$output/$packageName") jsonObject.put("platform", 1) jsonObject.put("match_device_type", 1) jsonObject.put("audience_type", 2) jsonObject.put("data_update_method", 1) jsonObject.put("audience_name", packageName) jsonObject.put("status", 1) jsonObject.put("audience_gender", 3) jsonObject.put("audience_count", 1) jsonObject.put("is_sync_dmpserver", 1) jsonObject.put("audience_data_status", 1) uploadPkgSet.add(packageName) uploadJsonArray.add(jsonObject) } }) val uploadJsonObject = ServerUtil.upload(uploadJsonArray) if (uploadJsonObject.getInteger("code") == 200) { println("RTDmp Upload OK,AudienceId -->> " + uploadJsonObject.getJSONArray("data")) } Logic.writeAudienceInfo("iqiyi", Logic.getAudienceMap(uploadPkgSet)) // 每次更新一个 for (i <- 0 until updateJsonArray.size()) { val updateObject = new JSONArray() updateObject.add(updateJsonArray.getJSONObject(i)) var flag = true while (flag) { val updateJsonObject = ServerUtil.update(updateObject) if (updateJsonObject.getInteger("code") == 200) { println("RTDmp Update OK,updateJson -->> " + updateObject) flag = false } } } } finally { if (spark != null) { spark.stop() } } 0 } } object RTDmpIQiYiRequest { def main(args: Array[String]): Unit = { new RTDmpIQiYiRequest().run(args) } }