diff --git a/dashboardv2/public/js/templates/common/Statistics_tmpl.html b/dashboardv2/public/js/templates/common/Statistics_tmpl.html index 13e64a7..71643c7 100644 --- a/dashboardv2/public/js/templates/common/Statistics_tmpl.html +++ b/dashboardv2/public/js/templates/common/Statistics_tmpl.html @@ -18,8 +18,8 @@ <div class="panel panel-default expand_collapse_panel-icon" data-id="entityActive"> <div class="panel-heading" data-toggle="collapse" href="#collapse1" aria-expanded="true"> <h4 class="panel-title"> - <a>Active Entities <span class="count">(0)</span></a> - </h4> + <a>Active Entities <span class="count">(0)</span></a> + </h4> <div class="btn-group pull-right"> <button type="button" title="Collapse"><i class="ec-icon fa"></i></button> </div> @@ -28,10 +28,10 @@ <div class="panel-body"> <table class="table stat-table"> <thead> - <tr> - <th>Entity</th> - <th>Count</th> - </tr> + <tr> + <th>Entity</th> + <th>Count</th> + </tr> </thead> <tbody > </tbody> @@ -42,8 +42,8 @@ <div class="panel panel-default expand_collapse_panel-icon" data-id="entityDelete"> <div class="panel-heading" data-toggle="collapse" href="#collapse2"> <h4 class="panel-title"> - <a>Deleted Entities <span class="count">(0)</span></a> - </h4> + <a>Deleted Entities <span class="count">(0)</span></a> + </h4> <div class="btn-group pull-right"> <button type="button" title="Collapse"><i class="ec-icon fa"></i></button> </div> @@ -52,10 +52,35 @@ <div class="panel-body"> <table class="table stat-table"> <thead> - <tr> - <th>Entity</th> - <th>Count</th> - </tr> + <tr> + <th>Entity</th> + <th>Count</th> + </tr> + </thead> + <tbody > + </tbody> + </table> + </div> + </div> + </div> + + <div class="panel panel-default expand_collapse_panel-icon" data-id="stats"> + <div class="panel-heading" data-toggle="collapse" href="#collapse3"> + <h4 class="panel-title"> + <a>Server Statistics </a> + </h4> + <div class="btn-group pull-right"> + <button type="button" title="Collapse"><i class="ec-icon fa"></i></button> + </div> + </div> + <div id="collapse3" class="panel-collapse collapse"> + <div class="panel-body"> + <table class="table stat-table"> + <thead> + <tr> + <th>Parameter</th> + <th>Value</th> + </tr> </thead> <tbody > </tbody> diff --git a/dashboardv2/public/js/views/common/Statistics.js b/dashboardv2/public/js/views/common/Statistics.js index fc1596a..009babe 100644 --- a/dashboardv2/public/js/views/common/Statistics.js +++ b/dashboardv2/public/js/views/common/Statistics.js @@ -38,7 +38,8 @@ define(['require', entityActive: "[data-id='entityActive'] tbody", entityDelete: "[data-id='entityDelete'] tbody", entityActiveHeader: "[data-id='entityActive'] .count", - entityDeletedHeader: "[data-id='entityDelete'] .count" + entityDeletedHeader: "[data-id='entityDelete'] .count", + stats: "[data-id='stats'] tbody" }, /** ui events hash */ events: function() {}, @@ -71,7 +72,8 @@ define(['require', var data = _.first(data.toJSON()), no_records = '<tr class="empty text-center"><td colspan="2"><span>No records found!</span></td></tr>', activeEntityTable = _.isEmpty(data.entity.entityActive) ? no_records : that.getTable({ valueObject: data.entity.entityActive }), - deleteEntityTable = _.isEmpty(data.entity.entityDeleted) ? no_records : that.getTable({ valueObject: data.entity.entityDeleted }); + deleteEntityTable = _.isEmpty(data.entity.entityDeleted) ? no_records : that.getTable({ valueObject: data.entity.entityDeleted}), + stats = _.isEmpty(data.general.stats) ? no_records : that.getTable({ valueObject: data.general.stats, formatIntVal: false}); var totalActive = 0, totalDeleted = 0; if (data.entity && data.general.entityCount) { @@ -84,6 +86,7 @@ define(['require', } that.ui.entityActive.html(activeEntityTable); that.ui.entityDelete.html(deleteEntityTable); + that.ui.stats.html(stats); that.ui.entityActiveHeader.html(" (" + _.numberFormatWithComa((totalActive - totalDeleted)) + ")"); that.ui.entityDeletedHeader.html(" (" + _.numberFormatWithComa(totalDeleted) + ")"); } diff --git a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java b/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java new file mode 100644 index 0000000..cb43059 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +/** + * Atlas statistics + */ +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AtlasStatistics { + public static final String STAT_SERVER_START_TS = "serverStartTimeStamp"; + public static final String STAT_SERVER_ACTIVE_TS = "serverActiveTimeStamp"; + public static final String STAT_SERVER_UP_SINCE = "serverUpTime"; + public static final String STAT_START_OFFSET = "KafkaTopic:ATLAS_HOOK:startOffset"; + public static final String STAT_CURRENT_OFFSET = "KafkaTopic:ATLAS_HOOK:currentOffset"; + public static final String STAT_SOLR_STATUS = "solrConnectionStatus"; + public static final String STAT_HBASE_STATUS = "HBaseConnectionStatus"; + public static final String STAT_LAST_MESSAGE_PROCESSED_TIME_TS = "lastMessageProcessedTimeStamp"; + public static final String STAT_AVG_MESSAGE_PROCESSING_TIME = "avgMessageProcessingTime"; + + private Map<String, Object> data = new HashMap<>(); + + public Map<String, Object> getData() { + return data; + } + + public void setData(Map<String, Object> data) { + this.data = data; + } + + @Override + public String toString() { + return "AtlasStatistics{" + + "data=" + data + + '}'; + } + + @Override + public int hashCode() { + return Objects.hash(data); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AtlasStatistics other = (AtlasStatistics) o; + + return Objects.equals(this.data, other.data); + } +} diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java index 607b830..d9ea12a 100644 --- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java +++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java @@ -18,11 +18,13 @@ package org.apache.atlas.services; import org.apache.atlas.annotation.AtlasService; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.util.StatisticsUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; @@ -63,13 +65,14 @@ public class MetricsService { private final AtlasGraph atlasGraph; private final AtlasTypeRegistry typeRegistry; + private final StatisticsUtil statisticsUtil; private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix(); @Inject - public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry) { + public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) { this.atlasGraph = graph; this.typeRegistry = typeRegistry; - + this.statisticsUtil = statisticsUtil; } @SuppressWarnings("unchecked") @@ -147,6 +150,10 @@ public class MetricsService { metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, collectionTime); + //add atlas server stats + Map<String, Object> statistics = statisticsUtil.getAtlasStatistics(); + metrics.addMetric(GENERAL, "stats", statistics); + return metrics; } @@ -156,7 +163,12 @@ public class MetricsService { indexQuery = String.format(indexQuery, typeName, status.name()); - return atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals(); + try { + return atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals(); + }catch (Exception e){ + LOG.error("Failed fetching using indexQuery: " + e.getMessage()); + } + return 0l; } private int getAllTypesCount() { diff --git a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java b/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java new file mode 100644 index 0000000..d57f350 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.util; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.AtlasStatistics; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; + +import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_START_TS; +import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_ACTIVE_TS; +import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_UP_SINCE; +import static org.apache.atlas.model.AtlasStatistics.STAT_START_OFFSET; +import static org.apache.atlas.model.AtlasStatistics.STAT_CURRENT_OFFSET; +import static org.apache.atlas.model.AtlasStatistics.STAT_SOLR_STATUS; +import static org.apache.atlas.model.AtlasStatistics.STAT_HBASE_STATUS; +import static org.apache.atlas.model.AtlasStatistics.STAT_LAST_MESSAGE_PROCESSED_TIME_TS; +import static org.apache.atlas.model.AtlasStatistics.STAT_AVG_MESSAGE_PROCESSING_TIME; + +@Component +public class StatisticsUtil { + private static final Logger LOG = LoggerFactory.getLogger(StatisticsUtil.class); + + private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("d MMM, yyyy : hh:mm aaa z"); + + private static final long DAY = 1000 * 60 * 60 * 24; + private static final long HOUR = 1000 * 60 * 60; + private static final long MIN = 1000 * 60; + private static final long SEC = 1000; + + private final AtlasGraph graph; + private final String STATUS_CONNECTED = "connected"; + private final String STATUS_NOT_CONNECTED = "not-connected"; + private final AtlasStatistics atlasStatistics; + + private long countMsgProcessed = 0; + private long totalMsgProcessingTimeMs = 0; + + @Inject + public StatisticsUtil(AtlasGraph graph) { + this.graph = graph; + this.atlasStatistics = new AtlasStatistics(); + } + + public Map<String, Object> getAtlasStatistics() { + Map<String, Object> statisticsMap = new HashMap<>(); + statisticsMap.putAll(atlasStatistics.getData()); + + statisticsMap.put(STAT_HBASE_STATUS, getHBaseStatus()); + statisticsMap.put(STAT_SOLR_STATUS , getSolrStatus()); + statisticsMap.put(STAT_SERVER_UP_SINCE, getUpSinceTime()); + formatStatistics(statisticsMap); + + return statisticsMap; + } + + public void setKafkaOffsets(long value){ + if (Long.parseLong(getStat(STAT_START_OFFSET).toString()) == -1) { + addStat(STAT_START_OFFSET, value); + } + addStat(STAT_CURRENT_OFFSET, ++value); + } + + public void setAvgMsgProcessingTime(long value) { + countMsgProcessed++; + totalMsgProcessingTimeMs += value; + value = totalMsgProcessingTimeMs / countMsgProcessed; + + addStat(STAT_AVG_MESSAGE_PROCESSING_TIME, value); + } + + public void setLastMsgProcessedTime() { + addStat(STAT_LAST_MESSAGE_PROCESSED_TIME_TS, System.currentTimeMillis()); + } + + public void setServerStartTime() { + addStat(STAT_SERVER_START_TS, System.currentTimeMillis()); + } + + public void setServerActiveTime() { + addStat(STAT_SERVER_ACTIVE_TS, System.currentTimeMillis()); + } + + + private void addStat(String key, Object value) { + Map<String, Object> data = atlasStatistics.getData(); + if (data == null) { + data = new HashMap<>(); + } + data.put(key, value); + atlasStatistics.setData(data); + } + + private Object getStat(String key) { + Map<String, Object> data = atlasStatistics.getData(); + Object ret = data.get(key); + if (ret == null) { + return -1; + } + return ret; + } + + private void formatStatistics(Map<String, Object> statisticsMap) { + for (Map.Entry<String, Object> stat : statisticsMap.entrySet()) { + switch (stat.getKey()) { + case STAT_SERVER_UP_SINCE: + statisticsMap.put(stat.getKey(), millisToTimeDiff(Long.parseLong(stat.getValue().toString()))); + break; + + case STAT_LAST_MESSAGE_PROCESSED_TIME_TS: + statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString()))); + break; + + case STAT_SERVER_START_TS: + case STAT_SERVER_ACTIVE_TS: + statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString()))); + break; + + case STAT_AVG_MESSAGE_PROCESSING_TIME: + statisticsMap.put(stat.getKey(), stat.getValue() + " milliseconds"); + break; + + case STAT_HBASE_STATUS: + case STAT_SOLR_STATUS: + String curState = ((boolean) stat.getValue()) ? STATUS_CONNECTED : STATUS_NOT_CONNECTED; + statisticsMap.put(stat.getKey(), curState); + break; + + default: + statisticsMap.put(stat.getKey(), stat.getValue()); + } + } + } + + private boolean getHBaseStatus(){ + + String query = "g.V().next()"; + try { + runWithTimeout(new Runnable() { + @Override + public void run() { + try { + graph.executeGremlinScript(query, false); + } catch (AtlasBaseException e) { + LOG.error(e.getMessage()); + } + } + }, 10, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error(e.getMessage()); + return false; + } + + return true; + } + + private boolean getSolrStatus(){ + String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + "__type.name\"" + " : (*)"; + try { + runWithTimeout(new Runnable() { + @Override + public void run() { + graph.indexQuery(Constants.VERTEX_INDEX, query).vertexTotals(); + } + }, 10, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error(e.getMessage()); + return false; + } + return true; + } + + private void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception { + runWithTimeout(new Callable<Object>() { + @Override + public Object call() { + runnable.run(); + return null; + } + }, timeout, timeUnit); + } + + private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception { + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final Future<T> future = executor.submit(callable); + executor.shutdown(); + try { + return future.get(timeout, timeUnit); + } catch (TimeoutException e) { + future.cancel(true); + throw e; + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof Exception) { + throw (Exception) t; + } else { + throw new IllegalStateException(t); + } + } + } + + private long getUpSinceTime() { + long upTS = Long.parseLong(getStat(STAT_SERVER_START_TS).toString()); + return System.currentTimeMillis() - upTS; + } + + private String millisToTimeDiff(long msDiff) { + StringBuilder sb = new StringBuilder(); + + long diffSeconds = msDiff / SEC % 60; + long diffMinutes = msDiff / MIN % 60; + long diffHours = msDiff / HOUR % 24; + long diffDays = msDiff / DAY; + + if (diffDays > 0) sb.append(diffDays).append(" day "); + if (diffHours > 0) sb.append(diffHours).append(" hour "); + if (diffMinutes > 0) sb.append(diffMinutes).append(" min "); + if (diffSeconds > 0) sb.append(diffSeconds).append(" sec"); + + return sb.toString(); + } + + private String millisToTimeStamp(long ms) { + return simpleDateFormat.format(ms); + } +} diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index d1d6003..48355c9 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -46,6 +46,7 @@ import org.apache.atlas.notification.preprocessor.EntityPreprocessor; import org.apache.atlas.notification.preprocessor.PreprocessorContext; import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; import org.apache.atlas.utils.LruCache; +import org.apache.atlas.util.StatisticsUtil; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; @@ -139,6 +140,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final ServiceState serviceState; private final AtlasInstanceConverter instanceConverter; private final AtlasTypeRegistry typeRegistry; + private final StatisticsUtil statisticsUtil; private final int maxRetries; private final int failedMsgCacheSize; private final int minWaitDuration; @@ -168,13 +170,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Inject public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, ServiceState serviceState, AtlasInstanceConverter instanceConverter, - AtlasTypeRegistry typeRegistry) throws AtlasException { + AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) throws AtlasException { this.notificationInterface = notificationInterface; this.atlasEntityStore = atlasEntityStore; this.serviceState = serviceState; this.instanceConverter = instanceConverter; this.typeRegistry = typeRegistry; this.applicationProperties = ApplicationProperties.get(); + this.statisticsUtil = statisticsUtil; maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1); @@ -686,7 +689,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } finally { AtlasPerfTracer.log(perf); - long msgProcessingTime = perf != null ? perf.getElapsedTime() : 0; + long msgProcessingTime = System.currentTimeMillis() - startTime; if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) { String strMessage = AbstractNotification.getMessageJson(message); @@ -697,10 +700,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (auditLog != null) { auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK); - auditLog.setTimeTaken(System.currentTimeMillis() - startTime); + auditLog.setTimeTaken(msgProcessingTime); AuditFilter.audit(auditLog); } + statisticsUtil.setAvgMsgProcessingTime(msgProcessingTime); } } @@ -756,6 +760,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl consumer.commit(partition, kafkaMessage.getOffset() + 1); commitSucceessStatus = true; + statisticsUtil.setKafkaOffsets(kafkaMessage.getOffset()); + statisticsUtil.setLastMsgProcessedTime(); } finally { failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); } diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java index ad0cb84..7887afb 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java @@ -23,6 +23,7 @@ import org.apache.atlas.ha.AtlasServerIdSelector; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.service.Service; +import org.apache.atlas.util.StatisticsUtil; import org.apache.commons.configuration.Configuration; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; @@ -39,6 +40,7 @@ import java.util.Comparator; import java.util.List; import java.util.Set; + /** * A service that implements leader election to determine whether this Atlas server is Active. * @@ -59,6 +61,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene private final Configuration configuration; private final ServiceState serviceState; private final ActiveInstanceState activeInstanceState; + private final StatisticsUtil statisticsUtil; private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders; private List<ActiveStateChangeHandler> activeStateChangeHandlers; private CuratorFactory curatorFactory; @@ -75,13 +78,14 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ActiveInstanceElectorService(Configuration configuration, Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders, CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState, - ServiceState serviceState) { + ServiceState serviceState, StatisticsUtil statisticsUtil) { this.configuration = configuration; this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders; this.activeStateChangeHandlers = new ArrayList<>(); this.curatorFactory = curatorFactory; this.activeInstanceState = activeInstanceState; this.serviceState = serviceState; + this.statisticsUtil = statisticsUtil; } /** @@ -92,7 +96,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene */ @Override public void start() throws AtlasException { + statisticsUtil.setServerStartTime(); if (!HAConfiguration.isHAEnabled(configuration)) { + statisticsUtil.setServerActiveTime(); LOG.info("HA is not enabled, no need to start leader election service"); return; } @@ -150,6 +156,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene } activeInstanceState.update(serverId); serviceState.setActive(); + statisticsUtil.setServerActiveTime(); } catch (Exception e) { LOG.error("Got exception while activating", e); notLeader(); diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index b4b88d1..c7ba699 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -25,6 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.kafka.*; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.util.StatisticsUtil; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.notification.HookNotificationV1; import org.apache.atlas.repository.converters.AtlasInstanceConverter; @@ -80,6 +81,9 @@ public class NotificationHookConsumerKafkaTest { @Mock private AtlasTypeRegistry typeRegistry; + @Mock + private StatisticsUtil statisticsUtil; + @BeforeTest public void setup() throws AtlasException, InterruptedException, AtlasBaseException { MockitoAnnotations.initMocks(this); @@ -104,7 +108,7 @@ public class NotificationHookConsumerKafkaTest { produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); @@ -123,7 +127,7 @@ public class NotificationHookConsumerKafkaTest { public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException { ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder; @@ -159,7 +163,7 @@ public class NotificationHookConsumerKafkaTest { assertNotNull (consumer); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index 2618b04..de316b6 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.notification.HookNotification.HookNotificationType; import org.apache.atlas.notification.NotificationInterface.NotificationType; +import org.apache.atlas.util.StatisticsUtil; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.repository.converters.AtlasInstanceConverter; @@ -75,6 +76,9 @@ public class NotificationHookConsumerTest { @Mock private AtlasTypeRegistry typeRegistry; + @Mock + private StatisticsUtil statisticsUtil; + @BeforeMethod public void setup() throws AtlasBaseException { MockitoAnnotations.initMocks(this); @@ -92,7 +96,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerCanProceedIfServerIsReady() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); @@ -105,7 +109,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); @@ -122,7 +126,7 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); EntityCreateRequest message = mock(EntityCreateRequest.class); @@ -139,7 +143,7 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class))); @@ -153,7 +157,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); @@ -174,7 +178,7 @@ public class NotificationHookConsumerTest { when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); notificationHookConsumer.startInternal(configuration, executorService); @@ -194,7 +198,7 @@ public class NotificationHookConsumerTest { when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); notificationHookConsumer.startInternal(configuration, executorService); @@ -213,7 +217,7 @@ public class NotificationHookConsumerTest { when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsActive(); @@ -234,7 +238,7 @@ public class NotificationHookConsumerTest { when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); doAnswer(new Answer() { @Override @@ -266,7 +270,7 @@ public class NotificationHookConsumerTest { when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); @@ -332,6 +336,6 @@ public class NotificationHookConsumerTest { when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException()); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); } } diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java index dd2df70..0fe3eba 100644 --- a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java @@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.util.StatisticsUtil; import org.apache.commons.configuration.Configuration; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.mockito.InOrder; @@ -51,6 +52,9 @@ public class ActiveInstanceElectorServiceTest { @Mock private ServiceState serviceState; + @Mock + private StatisticsUtil statisticsUtil; + @BeforeMethod public void setup() { System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, "21000"); @@ -71,7 +75,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); verify(leaderLatch).start(); @@ -92,7 +96,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); verify(leaderLatch).addListener(activeInstanceElectorService); @@ -104,7 +108,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); verifyZeroInteractions(curatorFactory); @@ -125,7 +129,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.stop(); @@ -147,7 +151,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.stop(); @@ -161,7 +165,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.stop(); verifyZeroInteractions(curatorFactory); @@ -189,7 +193,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); @@ -212,7 +216,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); @@ -245,7 +249,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); @@ -271,7 +275,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); @@ -306,7 +310,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, - activeInstanceState, serviceState); + activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.notLeader(); @@ -318,7 +322,7 @@ public class ActiveInstanceElectorServiceTest { public void testActiveStateSetOnBecomingLeader() { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), - curatorFactory, activeInstanceState, serviceState); + curatorFactory, activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.isLeader(); @@ -331,7 +335,7 @@ public class ActiveInstanceElectorServiceTest { public void testPassiveStateSetOnLoosingLeadership() { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), - curatorFactory, activeInstanceState, serviceState); + curatorFactory, activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.notLeader(); @@ -358,7 +362,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), - curatorFactory, activeInstanceState, serviceState); + curatorFactory, activeInstanceState, serviceState, statisticsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader();