Commit 984445e9 by nikhilbonte Committed by Sarath Subramanian

ATLAS-3017: Add Atlas server statistics rest endpoint

parent 66f57da8
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
<div class="panel panel-default expand_collapse_panel-icon" data-id="entityActive"> <div class="panel panel-default expand_collapse_panel-icon" data-id="entityActive">
<div class="panel-heading" data-toggle="collapse" href="#collapse1" aria-expanded="true"> <div class="panel-heading" data-toggle="collapse" href="#collapse1" aria-expanded="true">
<h4 class="panel-title"> <h4 class="panel-title">
<a>Active Entities <span class="count">(0)</span></a> <a>Active Entities <span class="count">(0)</span></a>
</h4> </h4>
<div class="btn-group pull-right"> <div class="btn-group pull-right">
<button type="button" title="Collapse"><i class="ec-icon fa"></i></button> <button type="button" title="Collapse"><i class="ec-icon fa"></i></button>
</div> </div>
...@@ -28,10 +28,10 @@ ...@@ -28,10 +28,10 @@
<div class="panel-body"> <div class="panel-body">
<table class="table stat-table"> <table class="table stat-table">
<thead> <thead>
<tr> <tr>
<th>Entity</th> <th>Entity</th>
<th>Count</th> <th>Count</th>
</tr> </tr>
</thead> </thead>
<tbody > <tbody >
</tbody> </tbody>
...@@ -42,8 +42,8 @@ ...@@ -42,8 +42,8 @@
<div class="panel panel-default expand_collapse_panel-icon" data-id="entityDelete"> <div class="panel panel-default expand_collapse_panel-icon" data-id="entityDelete">
<div class="panel-heading" data-toggle="collapse" href="#collapse2"> <div class="panel-heading" data-toggle="collapse" href="#collapse2">
<h4 class="panel-title"> <h4 class="panel-title">
<a>Deleted Entities <span class="count">(0)</span></a> <a>Deleted Entities <span class="count">(0)</span></a>
</h4> </h4>
<div class="btn-group pull-right"> <div class="btn-group pull-right">
<button type="button" title="Collapse"><i class="ec-icon fa"></i></button> <button type="button" title="Collapse"><i class="ec-icon fa"></i></button>
</div> </div>
...@@ -52,10 +52,35 @@ ...@@ -52,10 +52,35 @@
<div class="panel-body"> <div class="panel-body">
<table class="table stat-table"> <table class="table stat-table">
<thead> <thead>
<tr> <tr>
<th>Entity</th> <th>Entity</th>
<th>Count</th> <th>Count</th>
</tr> </tr>
</thead>
<tbody >
</tbody>
</table>
</div>
</div>
</div>
<div class="panel panel-default expand_collapse_panel-icon" data-id="stats">
<div class="panel-heading" data-toggle="collapse" href="#collapse3">
<h4 class="panel-title">
<a>Server Statistics </a>
</h4>
<div class="btn-group pull-right">
<button type="button" title="Collapse"><i class="ec-icon fa"></i></button>
</div>
</div>
<div id="collapse3" class="panel-collapse collapse">
<div class="panel-body">
<table class="table stat-table">
<thead>
<tr>
<th>Parameter</th>
<th>Value</th>
</tr>
</thead> </thead>
<tbody > <tbody >
</tbody> </tbody>
......
...@@ -38,7 +38,8 @@ define(['require', ...@@ -38,7 +38,8 @@ define(['require',
entityActive: "[data-id='entityActive'] tbody", entityActive: "[data-id='entityActive'] tbody",
entityDelete: "[data-id='entityDelete'] tbody", entityDelete: "[data-id='entityDelete'] tbody",
entityActiveHeader: "[data-id='entityActive'] .count", entityActiveHeader: "[data-id='entityActive'] .count",
entityDeletedHeader: "[data-id='entityDelete'] .count" entityDeletedHeader: "[data-id='entityDelete'] .count",
stats: "[data-id='stats'] tbody"
}, },
/** ui events hash */ /** ui events hash */
events: function() {}, events: function() {},
...@@ -71,7 +72,8 @@ define(['require', ...@@ -71,7 +72,8 @@ define(['require',
var data = _.first(data.toJSON()), var data = _.first(data.toJSON()),
no_records = '<tr class="empty text-center"><td colspan="2"><span>No records found!</span></td></tr>', no_records = '<tr class="empty text-center"><td colspan="2"><span>No records found!</span></td></tr>',
activeEntityTable = _.isEmpty(data.entity.entityActive) ? no_records : that.getTable({ valueObject: data.entity.entityActive }), activeEntityTable = _.isEmpty(data.entity.entityActive) ? no_records : that.getTable({ valueObject: data.entity.entityActive }),
deleteEntityTable = _.isEmpty(data.entity.entityDeleted) ? no_records : that.getTable({ valueObject: data.entity.entityDeleted }); deleteEntityTable = _.isEmpty(data.entity.entityDeleted) ? no_records : that.getTable({ valueObject: data.entity.entityDeleted}),
stats = _.isEmpty(data.general.stats) ? no_records : that.getTable({ valueObject: data.general.stats, formatIntVal: false});
var totalActive = 0, var totalActive = 0,
totalDeleted = 0; totalDeleted = 0;
if (data.entity && data.general.entityCount) { if (data.entity && data.general.entityCount) {
...@@ -84,6 +86,7 @@ define(['require', ...@@ -84,6 +86,7 @@ define(['require',
} }
that.ui.entityActive.html(activeEntityTable); that.ui.entityActive.html(activeEntityTable);
that.ui.entityDelete.html(deleteEntityTable); that.ui.entityDelete.html(deleteEntityTable);
that.ui.stats.html(stats);
that.ui.entityActiveHeader.html("&nbsp;(" + _.numberFormatWithComa((totalActive - totalDeleted)) + ")"); that.ui.entityActiveHeader.html("&nbsp;(" + _.numberFormatWithComa((totalActive - totalDeleted)) + ")");
that.ui.entityDeletedHeader.html("&nbsp;(" + _.numberFormatWithComa(totalDeleted) + ")"); that.ui.entityDeletedHeader.html("&nbsp;(" + _.numberFormatWithComa(totalDeleted) + ")");
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
/**
* Atlas statistics
*/
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AtlasStatistics {
public static final String STAT_SERVER_START_TS = "serverStartTimeStamp";
public static final String STAT_SERVER_ACTIVE_TS = "serverActiveTimeStamp";
public static final String STAT_SERVER_UP_SINCE = "serverUpTime";
public static final String STAT_START_OFFSET = "KafkaTopic:ATLAS_HOOK:startOffset";
public static final String STAT_CURRENT_OFFSET = "KafkaTopic:ATLAS_HOOK:currentOffset";
public static final String STAT_SOLR_STATUS = "solrConnectionStatus";
public static final String STAT_HBASE_STATUS = "HBaseConnectionStatus";
public static final String STAT_LAST_MESSAGE_PROCESSED_TIME_TS = "lastMessageProcessedTimeStamp";
public static final String STAT_AVG_MESSAGE_PROCESSING_TIME = "avgMessageProcessingTime";
private Map<String, Object> data = new HashMap<>();
public Map<String, Object> getData() {
return data;
}
public void setData(Map<String, Object> data) {
this.data = data;
}
@Override
public String toString() {
return "AtlasStatistics{" +
"data=" + data +
'}';
}
@Override
public int hashCode() {
return Objects.hash(data);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AtlasStatistics other = (AtlasStatistics) o;
return Objects.equals(this.data, other.data);
}
}
...@@ -18,11 +18,13 @@ ...@@ -18,11 +18,13 @@
package org.apache.atlas.services; package org.apache.atlas.services;
import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.StatisticsUtil;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -63,13 +65,14 @@ public class MetricsService { ...@@ -63,13 +65,14 @@ public class MetricsService {
private final AtlasGraph atlasGraph; private final AtlasGraph atlasGraph;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final StatisticsUtil statisticsUtil;
private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix(); private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix();
@Inject @Inject
public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry) { public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) {
this.atlasGraph = graph; this.atlasGraph = graph;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.statisticsUtil = statisticsUtil;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
...@@ -147,6 +150,10 @@ public class MetricsService { ...@@ -147,6 +150,10 @@ public class MetricsService {
metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, collectionTime); metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, collectionTime);
//add atlas server stats
Map<String, Object> statistics = statisticsUtil.getAtlasStatistics();
metrics.addMetric(GENERAL, "stats", statistics);
return metrics; return metrics;
} }
...@@ -156,7 +163,12 @@ public class MetricsService { ...@@ -156,7 +163,12 @@ public class MetricsService {
indexQuery = String.format(indexQuery, typeName, status.name()); indexQuery = String.format(indexQuery, typeName, status.name());
return atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals(); try {
return atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals();
}catch (Exception e){
LOG.error("Failed fetching using indexQuery: " + e.getMessage());
}
return 0l;
} }
private int getAllTypesCount() { private int getAllTypesCount() {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.util;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.AtlasStatistics;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_START_TS;
import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_ACTIVE_TS;
import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_UP_SINCE;
import static org.apache.atlas.model.AtlasStatistics.STAT_START_OFFSET;
import static org.apache.atlas.model.AtlasStatistics.STAT_CURRENT_OFFSET;
import static org.apache.atlas.model.AtlasStatistics.STAT_SOLR_STATUS;
import static org.apache.atlas.model.AtlasStatistics.STAT_HBASE_STATUS;
import static org.apache.atlas.model.AtlasStatistics.STAT_LAST_MESSAGE_PROCESSED_TIME_TS;
import static org.apache.atlas.model.AtlasStatistics.STAT_AVG_MESSAGE_PROCESSING_TIME;
@Component
public class StatisticsUtil {
private static final Logger LOG = LoggerFactory.getLogger(StatisticsUtil.class);
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("d MMM, yyyy : hh:mm aaa z");
private static final long DAY = 1000 * 60 * 60 * 24;
private static final long HOUR = 1000 * 60 * 60;
private static final long MIN = 1000 * 60;
private static final long SEC = 1000;
private final AtlasGraph graph;
private final String STATUS_CONNECTED = "connected";
private final String STATUS_NOT_CONNECTED = "not-connected";
private final AtlasStatistics atlasStatistics;
private long countMsgProcessed = 0;
private long totalMsgProcessingTimeMs = 0;
@Inject
public StatisticsUtil(AtlasGraph graph) {
this.graph = graph;
this.atlasStatistics = new AtlasStatistics();
}
public Map<String, Object> getAtlasStatistics() {
Map<String, Object> statisticsMap = new HashMap<>();
statisticsMap.putAll(atlasStatistics.getData());
statisticsMap.put(STAT_HBASE_STATUS, getHBaseStatus());
statisticsMap.put(STAT_SOLR_STATUS , getSolrStatus());
statisticsMap.put(STAT_SERVER_UP_SINCE, getUpSinceTime());
formatStatistics(statisticsMap);
return statisticsMap;
}
public void setKafkaOffsets(long value){
if (Long.parseLong(getStat(STAT_START_OFFSET).toString()) == -1) {
addStat(STAT_START_OFFSET, value);
}
addStat(STAT_CURRENT_OFFSET, ++value);
}
public void setAvgMsgProcessingTime(long value) {
countMsgProcessed++;
totalMsgProcessingTimeMs += value;
value = totalMsgProcessingTimeMs / countMsgProcessed;
addStat(STAT_AVG_MESSAGE_PROCESSING_TIME, value);
}
public void setLastMsgProcessedTime() {
addStat(STAT_LAST_MESSAGE_PROCESSED_TIME_TS, System.currentTimeMillis());
}
public void setServerStartTime() {
addStat(STAT_SERVER_START_TS, System.currentTimeMillis());
}
public void setServerActiveTime() {
addStat(STAT_SERVER_ACTIVE_TS, System.currentTimeMillis());
}
private void addStat(String key, Object value) {
Map<String, Object> data = atlasStatistics.getData();
if (data == null) {
data = new HashMap<>();
}
data.put(key, value);
atlasStatistics.setData(data);
}
private Object getStat(String key) {
Map<String, Object> data = atlasStatistics.getData();
Object ret = data.get(key);
if (ret == null) {
return -1;
}
return ret;
}
private void formatStatistics(Map<String, Object> statisticsMap) {
for (Map.Entry<String, Object> stat : statisticsMap.entrySet()) {
switch (stat.getKey()) {
case STAT_SERVER_UP_SINCE:
statisticsMap.put(stat.getKey(), millisToTimeDiff(Long.parseLong(stat.getValue().toString())));
break;
case STAT_LAST_MESSAGE_PROCESSED_TIME_TS:
statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
break;
case STAT_SERVER_START_TS:
case STAT_SERVER_ACTIVE_TS:
statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
break;
case STAT_AVG_MESSAGE_PROCESSING_TIME:
statisticsMap.put(stat.getKey(), stat.getValue() + " milliseconds");
break;
case STAT_HBASE_STATUS:
case STAT_SOLR_STATUS:
String curState = ((boolean) stat.getValue()) ? STATUS_CONNECTED : STATUS_NOT_CONNECTED;
statisticsMap.put(stat.getKey(), curState);
break;
default:
statisticsMap.put(stat.getKey(), stat.getValue());
}
}
}
private boolean getHBaseStatus(){
String query = "g.V().next()";
try {
runWithTimeout(new Runnable() {
@Override
public void run() {
try {
graph.executeGremlinScript(query, false);
} catch (AtlasBaseException e) {
LOG.error(e.getMessage());
}
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
return false;
}
return true;
}
private boolean getSolrStatus(){
String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + "__type.name\"" + " : (*)";
try {
runWithTimeout(new Runnable() {
@Override
public void run() {
graph.indexQuery(Constants.VERTEX_INDEX, query).vertexTotals();
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
return false;
}
return true;
}
private void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception {
runWithTimeout(new Callable<Object>() {
@Override
public Object call() {
runnable.run();
return null;
}
}, timeout, timeUnit);
}
private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Future<T> future = executor.submit(callable);
executor.shutdown();
try {
return future.get(timeout, timeUnit);
} catch (TimeoutException e) {
future.cancel(true);
throw e;
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof Error) {
throw (Error) t;
} else if (t instanceof Exception) {
throw (Exception) t;
} else {
throw new IllegalStateException(t);
}
}
}
private long getUpSinceTime() {
long upTS = Long.parseLong(getStat(STAT_SERVER_START_TS).toString());
return System.currentTimeMillis() - upTS;
}
private String millisToTimeDiff(long msDiff) {
StringBuilder sb = new StringBuilder();
long diffSeconds = msDiff / SEC % 60;
long diffMinutes = msDiff / MIN % 60;
long diffHours = msDiff / HOUR % 24;
long diffDays = msDiff / DAY;
if (diffDays > 0) sb.append(diffDays).append(" day ");
if (diffHours > 0) sb.append(diffHours).append(" hour ");
if (diffMinutes > 0) sb.append(diffMinutes).append(" min ");
if (diffSeconds > 0) sb.append(diffSeconds).append(" sec");
return sb.toString();
}
private String millisToTimeStamp(long ms) {
return simpleDateFormat.format(ms);
}
}
...@@ -46,6 +46,7 @@ import org.apache.atlas.notification.preprocessor.EntityPreprocessor; ...@@ -46,6 +46,7 @@ import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext; import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.atlas.utils.LruCache; import org.apache.atlas.utils.LruCache;
import org.apache.atlas.util.StatisticsUtil;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
...@@ -139,6 +140,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -139,6 +140,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final ServiceState serviceState; private final ServiceState serviceState;
private final AtlasInstanceConverter instanceConverter; private final AtlasInstanceConverter instanceConverter;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final StatisticsUtil statisticsUtil;
private final int maxRetries; private final int maxRetries;
private final int failedMsgCacheSize; private final int failedMsgCacheSize;
private final int minWaitDuration; private final int minWaitDuration;
...@@ -168,13 +170,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -168,13 +170,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Inject @Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
ServiceState serviceState, AtlasInstanceConverter instanceConverter, ServiceState serviceState, AtlasInstanceConverter instanceConverter,
AtlasTypeRegistry typeRegistry) throws AtlasException { AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) throws AtlasException {
this.notificationInterface = notificationInterface; this.notificationInterface = notificationInterface;
this.atlasEntityStore = atlasEntityStore; this.atlasEntityStore = atlasEntityStore;
this.serviceState = serviceState; this.serviceState = serviceState;
this.instanceConverter = instanceConverter; this.instanceConverter = instanceConverter;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.applicationProperties = ApplicationProperties.get(); this.applicationProperties = ApplicationProperties.get();
this.statisticsUtil = statisticsUtil;
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1); failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
...@@ -686,7 +689,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -686,7 +689,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} finally { } finally {
AtlasPerfTracer.log(perf); AtlasPerfTracer.log(perf);
long msgProcessingTime = perf != null ? perf.getElapsedTime() : 0; long msgProcessingTime = System.currentTimeMillis() - startTime;
if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) { if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) {
String strMessage = AbstractNotification.getMessageJson(message); String strMessage = AbstractNotification.getMessageJson(message);
...@@ -697,10 +700,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -697,10 +700,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (auditLog != null) { if (auditLog != null) {
auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK); auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK);
auditLog.setTimeTaken(System.currentTimeMillis() - startTime); auditLog.setTimeTaken(msgProcessingTime);
AuditFilter.audit(auditLog); AuditFilter.audit(auditLog);
} }
statisticsUtil.setAvgMsgProcessingTime(msgProcessingTime);
} }
} }
...@@ -756,6 +760,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -756,6 +760,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumer.commit(partition, kafkaMessage.getOffset() + 1); consumer.commit(partition, kafkaMessage.getOffset() + 1);
commitSucceessStatus = true; commitSucceessStatus = true;
statisticsUtil.setKafkaOffsets(kafkaMessage.getOffset());
statisticsUtil.setLastMsgProcessedTime();
} finally { } finally {
failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
} }
......
...@@ -23,6 +23,7 @@ import org.apache.atlas.ha.AtlasServerIdSelector; ...@@ -23,6 +23,7 @@ import org.apache.atlas.ha.AtlasServerIdSelector;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.atlas.util.StatisticsUtil;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
...@@ -39,6 +40,7 @@ import java.util.Comparator; ...@@ -39,6 +40,7 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
* A service that implements leader election to determine whether this Atlas server is Active. * A service that implements leader election to determine whether this Atlas server is Active.
* *
...@@ -59,6 +61,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -59,6 +61,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
private final Configuration configuration; private final Configuration configuration;
private final ServiceState serviceState; private final ServiceState serviceState;
private final ActiveInstanceState activeInstanceState; private final ActiveInstanceState activeInstanceState;
private final StatisticsUtil statisticsUtil;
private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders; private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
private List<ActiveStateChangeHandler> activeStateChangeHandlers; private List<ActiveStateChangeHandler> activeStateChangeHandlers;
private CuratorFactory curatorFactory; private CuratorFactory curatorFactory;
...@@ -75,13 +78,14 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -75,13 +78,14 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
ActiveInstanceElectorService(Configuration configuration, ActiveInstanceElectorService(Configuration configuration,
Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders, Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders,
CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState, CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState,
ServiceState serviceState) { ServiceState serviceState, StatisticsUtil statisticsUtil) {
this.configuration = configuration; this.configuration = configuration;
this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders; this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders;
this.activeStateChangeHandlers = new ArrayList<>(); this.activeStateChangeHandlers = new ArrayList<>();
this.curatorFactory = curatorFactory; this.curatorFactory = curatorFactory;
this.activeInstanceState = activeInstanceState; this.activeInstanceState = activeInstanceState;
this.serviceState = serviceState; this.serviceState = serviceState;
this.statisticsUtil = statisticsUtil;
} }
/** /**
...@@ -92,7 +96,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -92,7 +96,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
*/ */
@Override @Override
public void start() throws AtlasException { public void start() throws AtlasException {
statisticsUtil.setServerStartTime();
if (!HAConfiguration.isHAEnabled(configuration)) { if (!HAConfiguration.isHAEnabled(configuration)) {
statisticsUtil.setServerActiveTime();
LOG.info("HA is not enabled, no need to start leader election service"); LOG.info("HA is not enabled, no need to start leader election service");
return; return;
} }
...@@ -150,6 +156,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -150,6 +156,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
} }
activeInstanceState.update(serverId); activeInstanceState.update(serverId);
serviceState.setActive(); serviceState.setActive();
statisticsUtil.setServerActiveTime();
} catch (Exception e) { } catch (Exception e) {
LOG.error("Got exception while activating", e); LOG.error("Got exception while activating", e);
notLeader(); notLeader();
......
...@@ -25,6 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException; ...@@ -25,6 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.*; import org.apache.atlas.kafka.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.util.StatisticsUtil;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1; import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
...@@ -80,6 +81,9 @@ public class NotificationHookConsumerKafkaTest { ...@@ -80,6 +81,9 @@ public class NotificationHookConsumerKafkaTest {
@Mock @Mock
private AtlasTypeRegistry typeRegistry; private AtlasTypeRegistry typeRegistry;
@Mock
private StatisticsUtil statisticsUtil;
@BeforeTest @BeforeTest
public void setup() throws AtlasException, InterruptedException, AtlasBaseException { public void setup() throws AtlasException, InterruptedException, AtlasBaseException {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
...@@ -104,7 +108,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -104,7 +108,7 @@ public class NotificationHookConsumerKafkaTest {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false); NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
...@@ -123,7 +127,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -123,7 +127,7 @@ public class NotificationHookConsumerKafkaTest {
public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException { public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true); ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder; NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
...@@ -159,7 +163,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -159,7 +163,7 @@ public class NotificationHookConsumerKafkaTest {
assertNotNull (consumer); assertNotNull (consumer);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
......
...@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; ...@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType; import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.util.StatisticsUtil;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
...@@ -75,6 +76,9 @@ public class NotificationHookConsumerTest { ...@@ -75,6 +76,9 @@ public class NotificationHookConsumerTest {
@Mock @Mock
private AtlasTypeRegistry typeRegistry; private AtlasTypeRegistry typeRegistry;
@Mock
private StatisticsUtil statisticsUtil;
@BeforeMethod @BeforeMethod
public void setup() throws AtlasBaseException { public void setup() throws AtlasBaseException {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
...@@ -92,7 +96,7 @@ public class NotificationHookConsumerTest { ...@@ -92,7 +96,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerCanProceedIfServerIsReady() throws Exception { public void testConsumerCanProceedIfServerIsReady() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
...@@ -105,7 +109,7 @@ public class NotificationHookConsumerTest { ...@@ -105,7 +109,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
...@@ -122,7 +126,7 @@ public class NotificationHookConsumerTest { ...@@ -122,7 +126,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = mock(EntityCreateRequest.class); EntityCreateRequest message = mock(EntityCreateRequest.class);
...@@ -139,7 +143,7 @@ public class NotificationHookConsumerTest { ...@@ -139,7 +143,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException { public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class))); EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
...@@ -153,7 +157,7 @@ public class NotificationHookConsumerTest { ...@@ -153,7 +157,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
...@@ -174,7 +178,7 @@ public class NotificationHookConsumerTest { ...@@ -174,7 +178,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
...@@ -194,7 +198,7 @@ public class NotificationHookConsumerTest { ...@@ -194,7 +198,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
...@@ -213,7 +217,7 @@ public class NotificationHookConsumerTest { ...@@ -213,7 +217,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsActive(); notificationHookConsumer.instanceIsActive();
...@@ -234,7 +238,7 @@ public class NotificationHookConsumerTest { ...@@ -234,7 +238,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
doAnswer(new Answer() { doAnswer(new Answer() {
@Override @Override
...@@ -266,7 +270,7 @@ public class NotificationHookConsumerTest { ...@@ -266,7 +270,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive(); notificationHookConsumer.instanceIsPassive();
...@@ -332,6 +336,6 @@ public class NotificationHookConsumerTest { ...@@ -332,6 +336,6 @@ public class NotificationHookConsumerTest {
when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException()); when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
} }
} }
...@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException; ...@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.util.StatisticsUtil;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.mockito.InOrder; import org.mockito.InOrder;
...@@ -51,6 +52,9 @@ public class ActiveInstanceElectorServiceTest { ...@@ -51,6 +52,9 @@ public class ActiveInstanceElectorServiceTest {
@Mock @Mock
private ServiceState serviceState; private ServiceState serviceState;
@Mock
private StatisticsUtil statisticsUtil;
@BeforeMethod @BeforeMethod
public void setup() { public void setup() {
System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, "21000"); System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, "21000");
...@@ -71,7 +75,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -71,7 +75,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
verify(leaderLatch).start(); verify(leaderLatch).start();
...@@ -92,7 +96,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -92,7 +96,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
verify(leaderLatch).addListener(activeInstanceElectorService); verify(leaderLatch).addListener(activeInstanceElectorService);
...@@ -104,7 +108,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -104,7 +108,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
verifyZeroInteractions(curatorFactory); verifyZeroInteractions(curatorFactory);
...@@ -125,7 +129,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -125,7 +129,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.stop(); activeInstanceElectorService.stop();
...@@ -147,7 +151,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -147,7 +151,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.stop(); activeInstanceElectorService.stop();
...@@ -161,7 +165,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -161,7 +165,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.stop(); activeInstanceElectorService.stop();
verifyZeroInteractions(curatorFactory); verifyZeroInteractions(curatorFactory);
...@@ -189,7 +193,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -189,7 +193,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -212,7 +216,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -212,7 +216,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -245,7 +249,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -245,7 +249,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -271,7 +275,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -271,7 +275,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -306,7 +310,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -306,7 +310,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
activeInstanceState, serviceState); activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.notLeader(); activeInstanceElectorService.notLeader();
...@@ -318,7 +322,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -318,7 +322,7 @@ public class ActiveInstanceElectorServiceTest {
public void testActiveStateSetOnBecomingLeader() { public void testActiveStateSetOnBecomingLeader() {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
curatorFactory, activeInstanceState, serviceState); curatorFactory, activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -331,7 +335,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -331,7 +335,7 @@ public class ActiveInstanceElectorServiceTest {
public void testPassiveStateSetOnLoosingLeadership() { public void testPassiveStateSetOnLoosingLeadership() {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
curatorFactory, activeInstanceState, serviceState); curatorFactory, activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.notLeader(); activeInstanceElectorService.notLeader();
...@@ -358,7 +362,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -358,7 +362,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
curatorFactory, activeInstanceState, serviceState); curatorFactory, activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment