Commit 6d71f14a by Saqeeb Shaikh Committed by nixonrodrigues

ATLAS-3344-Atlas Metrics to include topic partition wise additional information.

parent a876e697
......@@ -16,10 +16,12 @@
-->
<thead>
<tr>
<th>Kafka Topic</th>
<th>Current Offset</th>
<th>Kafka Topic-Partition</th>
<th>Start Offset</th>
<th>Current Offset</th>
<th>Processed</th>
<th>Failed</th>
<th>Last Message Processed Time</th>
</tr>
</thead>
{{#if data}}
......
......@@ -65,26 +65,16 @@
<div class="card-container panel panel-primary">
<div class="panel-heading">Notification Details</div>
<div class="panel-body">
<table class="table stat-table">
<tbody data-id="notification-small-card">
<tr class="empty text-center">
<td colspan="2"><span>No records found!</span></td>
</tr>
</tbody>
</table>
<hr>
</hr>
<table data-id="notification-card" class="table stat-table notification-table table-striped ">
<table data-id="offset-card" class="table stat-table notification-table table-striped ">
<tbody>
<tr class="empty text-center">
<td colspan="2"><span>No records found!</span></td>
</tr>
</tbody>
</table>
<hr>
</hr>
<table data-id="offset-card" class="table stat-table notification-table table-striped ">
<table data-id="notification-card" class="table stat-table notification-table table-striped ">
<tbody>
<tr class="empty text-center">
<td colspan="2"><span>No records found!</span></td>
......@@ -94,7 +84,6 @@
</div>
</div>
</div>
</div>
</div>
</div>
......
......@@ -185,7 +185,9 @@ define(['require'], function(require) {
"totalDeletes": "number",
"totalFailed": "number",
"totalUpdates": "number",
"topicOffsets":"number"
"processedMessageCount": "number",
"lastMessageProcessedTime": "day",
"failedMessageCount": "number"
}
};
return Enums;
......
......@@ -48,7 +48,6 @@ define(['require',
connectionCard: "[data-id='connection-card']",
notificationCard: "[data-id='notification-card']",
statsNotificationTable: "[data-id='stats-notification-table']",
notificationSmallCard: "[data-id='notification-small-card']",
entityCard: "[data-id='entity-card']",
offsetCard: "[data-id='offset-card']"
},
......@@ -237,14 +236,6 @@ define(['require',
})
);
that.ui.notificationSmallCard.html(
createTable({
"enums": Enums.stats.Notification,
"data": _.pick(data.Notification, 'lastMessageProcessedTime', 'offsetCurrent', 'offsetStart')
})
);
var offsetTableColumn = function(obj) {
var returnObj = []
_.each(obj, function(value, key) {
......@@ -255,14 +246,12 @@ define(['require',
that.ui.offsetCard.html(
TopicOffsetTable({
"enums": Enums.stats.Notification,
"data": data.Notification.topicOffsets,
"tableHeader": ['offsetCurrent', 'offsetStart'],
"tableCol": offsetTableColumn(data.Notification.topicOffsets),
"getTmplValue": function(argument, args) {
console.log(argument, args)
var returnVal = data.Notification.topicOffsets[argument.label][args];
return returnVal ? _.numberFormatWithComa(returnVal) : 0;
data: data.Notification.topicDetails,
tableHeader: ["offsetStart", "offsetCurrent", "processedMessageCount", "failedMessageCount", "lastMessageProcessedTime"],
tableCol: offsetTableColumn(data.Notification.topicDetails),
getTmplValue: function(argument, args) {
var returnVal = data.Notification.topicDetails[argument.label][args];
return returnVal ? that.getValue({ value: returnVal, type: Enums.stats.Notification[args] }) : 0;
}
})
)
......
......@@ -55,7 +55,7 @@ public class AtlasMetrics {
public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourFailed";
public static final String STAT_NOTIFY_START_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourStartTime";
public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME = PREFIX_NOTIFICATION + "lastMessageProcessedTime";
public static final String STAT_NOTIFY_TOPIC_OFFSETS = PREFIX_NOTIFICATION + "topicOffsets";
public static final String STAT_NOTIFY_TOPIC_DETAILS = PREFIX_NOTIFICATION + "topicDetails";
public static final String STAT_NOTIFY_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDay";
public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY = PREFIX_NOTIFICATION + "previousDayAvgTime";
public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityCreates";
......
......@@ -109,6 +109,11 @@ public class AtlasMetricsUtil {
}
partitionStat.setCurrentOffset(msgOffset + 1);
if(stats.isFailedMsg) {
partitionStat.incrFailedMessageCount();
}
partitionStat.incrProcessedMessageCount();
partitionStat.setLastMessageProcessedTime(messagesProcessed.getLastIncrTime().toEpochMilli());
}
public Map<String, Object> getStats() {
......@@ -126,20 +131,25 @@ public class AtlasMetricsUtil {
ret.put(STAT_SERVER_STATUS_BACKEND_STORE, getBackendStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
ret.put(STAT_SERVER_STATUS_INDEX_STORE, getIndexStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
Map<String, Map<String, Long>> topicOffsets = new HashMap<>();
Map<String, Map<String, Long>> topicDetails = new HashMap<>();
for (TopicStats tStat : topicStats.values()) {
for (TopicPartitionStat tpStat : tStat.partitionStats.values()) {
Map<String, Long> tpOffsets = new HashMap<>();
tpOffsets.put("offsetStart", tpStat.startOffset);
tpOffsets.put("offsetCurrent", tpStat.currentOffset);
Map<String, Long> tpDetails = new HashMap<>();
topicOffsets.put(tpStat.topicName + "-" + tpStat.partition, tpOffsets);
tpDetails.put("offsetStart", tpStat.startOffset);
tpDetails.put("offsetCurrent", tpStat.currentOffset);
tpDetails.put("failedMessageCount", tpStat.failedMessageCount);
tpDetails.put("lastMessageProcessedTime", tpStat.lastMessageProcessedTime);
tpDetails.put("processedMessageCount", tpStat.processedMessageCount);
if(LOG.isDebugEnabled()) {
LOG.debug("Setting failedMessageCount : {} and lastMessageProcessedTime : {} for topic {}-{}", tpStat.failedMessageCount, tpStat.lastMessageProcessedTime, tpStat.topicName, tpStat.partition);
}
topicDetails.put(tpStat.topicName + "-" + tpStat.partition, tpDetails);
}
}
ret.put(STAT_NOTIFY_TOPIC_OFFSETS, topicOffsets);
ret.put(STAT_NOTIFY_TOPIC_DETAILS, topicDetails);
ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME, this.messagesProcessed.getLastIncrTime().toEpochMilli());
ret.put(STAT_NOTIFY_COUNT_TOTAL, messagesProcessed.getCount(ALL));
......@@ -345,6 +355,9 @@ public class AtlasMetricsUtil {
private final int partition;
private final long startOffset;
private long currentOffset;
private long lastMessageProcessedTime;
private long failedMessageCount;
private long processedMessageCount;
public TopicPartitionStat(String topicName, int partition, long startOffset, long currentOffset) {
this.topicName = topicName;
......@@ -373,5 +386,16 @@ public class AtlasMetricsUtil {
this.currentOffset = currentOffset;
}
public long getLastMessageProcessedTime() { return lastMessageProcessedTime; }
public void setLastMessageProcessedTime(long lastMessageProcessedTime) { this.lastMessageProcessedTime = lastMessageProcessedTime; }
public long getFailedMessageCount() { return failedMessageCount; }
public void incrFailedMessageCount() { this.failedMessageCount++; }
public long getProcessedMessageCount() { return processedMessageCount; }
public void incrProcessedMessageCount() { this.processedMessageCount++; }
};
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment