Commit 1a5009ea by Madhan Neethiraj

ATLAS-3071: updated stats/metrics module to collect notification metrics

Co-authored-by: "lma <lma@cloudera.com>"
parent fbabb8ba
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
/**
* Atlas statistics
*/
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AtlasStatistics {
public static final String STAT_SERVER_START_TS = "Server:upFrom";
public static final String STAT_SERVER_ACTIVE_TS = "Server:activateFrom";
public static final String STAT_SERVER_UP_SINCE = "Server:upTime";
public static final String STAT_START_OFFSET = "Notification:ATLAS_HOOK:offsetStart";
public static final String STAT_CURRENT_OFFSET = "Notification:ATLAS_HOOK:offsetCurrent";
public static final String STAT_SOLR_STATUS = "ConnectionStatus:Solr";
public static final String STAT_HBASE_STATUS = "ConnectionStatus:HBase";
public static final String STAT_LAST_MESSAGE_PROCESSED_TIME_TS = "Notification:ATLAS_HOOK:messageLastProcessedAt";
public static final String STAT_AVG_MESSAGE_PROCESSING_TIME = "Notification:ATLAS_HOOK:messageAvgProcessingDuration";
public static final String STAT_MESSAGES_CONSUMED = "Notification:ATLAS_HOOK:messagesConsumed";
private Map<String, Object> data = new HashMap<>();
public Map<String, Object> getData() {
return data;
}
public void setData(Map<String, Object> data) {
this.data = data;
}
@Override
public String toString() {
return "AtlasStatistics{" +
"data=" + data +
'}';
}
@Override
public int hashCode() {
return Objects.hash(data);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AtlasStatistics other = (AtlasStatistics) o;
return Objects.equals(this.data, other.data);
}
}
...@@ -36,6 +36,51 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ ...@@ -36,6 +36,51 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true) @JsonIgnoreProperties(ignoreUnknown=true)
public class AtlasMetrics { public class AtlasMetrics {
public static final String PREFIX_CONNECTION_STATUS = "ConnectionStatus:";
public static final String PREFIX_NOTIFICATION = "Notification:";
public static final String PREFIX_SERVER = "Server:";
public static final String STAT_CONNECTION_STATUS_BACKEND_STORE = PREFIX_CONNECTION_STATUS + "backendStore";
public static final String STAT_CONNECTION_STATUS_INDEX_STORE = PREFIX_CONNECTION_STATUS + "indexStore";
public static final String STAT_NOTIFY_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDay";
public static final String STAT_NOTIFY_AVG_TIME_CURR_DAY = PREFIX_NOTIFICATION + "currentDayAvgTime";
public static final String STAT_NOTIFY_CREATES_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayEntityCreates";
public static final String STAT_NOTIFY_UPDATES_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayEntityUpdates";
public static final String STAT_NOTIFY_DELETES_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayEntityDeletes";
public static final String STAT_NOTIFY_FAILED_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayFailed";
public static final String STAT_NOTIFY_START_TIME_CURR_DAY = PREFIX_NOTIFICATION + "currentDayStartTime";
public static final String STAT_NOTIFY_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHour";
public static final String STAT_NOTIFY_AVG_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourAvgTime";
public static final String STAT_NOTIFY_CREATES_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourEntityCreates";
public static final String STAT_NOTIFY_UPDATES_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourEntityUpdates";
public static final String STAT_NOTIFY_DELETES_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourEntityDeletes";
public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourFailed";
public static final String STAT_NOTIFY_START_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourStartTime";
public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME = PREFIX_NOTIFICATION + "lastMessageProcessedTime";
public static final String STAT_NOTIFY_START_OFFSET = PREFIX_NOTIFICATION + "offsetStart";
public static final String STAT_NOTIFY_CURRENT_OFFSET = PREFIX_NOTIFICATION + "offsetCurrent";
public static final String STAT_NOTIFY_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDay";
public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY = PREFIX_NOTIFICATION + "previousDayAvgTime";
public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityCreates";
public static final String STAT_NOTIFY_UPDATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityUpdates";
public static final String STAT_NOTIFY_DELETES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityDeletes";
public static final String STAT_NOTIFY_FAILED_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayFailed";
public static final String STAT_NOTIFY_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHour";
public static final String STAT_NOTIFY_AVG_TIME_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourAvgTime";
public static final String STAT_NOTIFY_CREATES_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourEntityCreates";
public static final String STAT_NOTIFY_UPDATES_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourEntityUpdates";
public static final String STAT_NOTIFY_DELETES_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourEntityDeletes";
public static final String STAT_NOTIFY_FAILED_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourFailed";
public static final String STAT_NOTIFY_COUNT_TOTAL = PREFIX_NOTIFICATION + "total";
public static final String STAT_NOTIFY_AVG_TIME_TOTAL = PREFIX_NOTIFICATION + "totalAvgTime";
public static final String STAT_NOTIFY_CREATES_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalCreates";
public static final String STAT_NOTIFY_UPDATES_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalUpdates";
public static final String STAT_NOTIFY_DELETES_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalDeletes";
public static final String STAT_NOTIFY_FAILED_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalFailed";
public static final String STAT_SERVER_ACTIVE_TIMESTAMP = PREFIX_SERVER + "activeTimeStamp";
public static final String STAT_SERVER_START_TIMESTAMP = PREFIX_SERVER + "startTimeStamp";
public static final String STAT_SERVER_UP_TIME = PREFIX_SERVER + "upTime";
private Map<String, Map<String, Object>> data; private Map<String, Map<String, Object>> data;
public AtlasMetrics() { public AtlasMetrics() {
...@@ -63,30 +108,38 @@ public class AtlasMetrics { ...@@ -63,30 +108,38 @@ public class AtlasMetrics {
@JsonIgnore @JsonIgnore
public void addMetric(String groupKey, String key, Object value) { public void addMetric(String groupKey, String key, Object value) {
Map<String, Map<String, Object>> data = this.data; Map<String, Map<String, Object>> data = this.data;
if (data == null) { if (data == null) {
data = new HashMap<>(); data = new HashMap<>();
this.data = data;
} }
Map<String, Object> metricMap = data.computeIfAbsent(groupKey, k -> new HashMap<>()); Map<String, Object> metricMap = data.computeIfAbsent(groupKey, k -> new HashMap<>());
metricMap.put(key, value); metricMap.put(key, value);
setData(data);
} }
@JsonIgnore @JsonIgnore
public Number getNumericMetric(String groupKey, String key) { public Number getNumericMetric(String groupKey, String key) {
Object metric = getMetric(groupKey, key); Object metric = getMetric(groupKey, key);
return metric instanceof Number ? (Number) metric : null; return metric instanceof Number ? (Number) metric : null;
} }
@JsonIgnore @JsonIgnore
public Object getMetric(String groupKey, String key) { public Object getMetric(String groupKey, String key) {
Object ret = null;
Map<String, Map<String, Object>> data = this.data; Map<String, Map<String, Object>> data = this.data;
Object ret = null;
if (data != null) { if (data != null) {
Map<String, Object> metricMap = data.get(groupKey); Map<String, Object> metricMap = data.get(groupKey);
if (metricMap != null && !metricMap.isEmpty()) { if (metricMap != null && !metricMap.isEmpty()) {
ret = metricMap.get(key); ret = metricMap.get(key);
} }
} }
return ret; return ret;
} }
} }
...@@ -18,15 +18,13 @@ ...@@ -18,15 +18,13 @@
package org.apache.atlas.services; package org.apache.atlas.services;
import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.StatisticsUtil; import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -53,6 +51,8 @@ public class MetricsService { ...@@ -53,6 +51,8 @@ public class MetricsService {
public static final String GENERAL = "general"; public static final String GENERAL = "general";
// Query names // Query names
protected static final String METRIC_COLLECTION_TIME = "collectionTime";
protected static final String METRIC_STATS = "stats";
protected static final String METRIC_TYPE_COUNT = TYPE + "Count"; protected static final String METRIC_TYPE_COUNT = TYPE + "Count";
protected static final String METRIC_TYPE_UNUSED_COUNT = TYPE + "UnusedCount"; protected static final String METRIC_TYPE_UNUSED_COUNT = TYPE + "UnusedCount";
protected static final String METRIC_ENTITY_COUNT = ENTITY + "Count"; protected static final String METRIC_ENTITY_COUNT = ENTITY + "Count";
...@@ -61,114 +61,90 @@ public class MetricsService { ...@@ -61,114 +61,90 @@ public class MetricsService {
protected static final String METRIC_TAG_COUNT = TAG + "Count"; protected static final String METRIC_TAG_COUNT = TAG + "Count";
protected static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities"; protected static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities";
public static final String METRIC_COLLECTION_TIME = "collectionTime";
private final AtlasGraph atlasGraph; private final AtlasGraph atlasGraph;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final StatisticsUtil statisticsUtil; private final AtlasMetricsUtil metricsUtil;
private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix(); private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix();
@Inject @Inject
public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) { public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) {
this.atlasGraph = graph; this.atlasGraph = graph;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.statisticsUtil = statisticsUtil; this.metricsUtil = metricsUtil;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public AtlasMetrics getMetrics() { public AtlasMetrics getMetrics() {
AtlasMetrics metrics = new AtlasMetrics(); Collection<String> entityDefNames = typeRegistry.getAllEntityDefNames();
metrics.addMetric(GENERAL, METRIC_TYPE_COUNT, getAllTypesCount());
metrics.addMetric(GENERAL, METRIC_TAG_COUNT, getAllTagsCount());
Map<String, Long> activeCountMap = new HashMap<>();
Map<String, Long> deletedCountMap = new HashMap<>();
// metrics for classifications
Collection<String> classificationDefNames = typeRegistry.getAllClassificationDefNames(); Collection<String> classificationDefNames = typeRegistry.getAllClassificationDefNames();
Map<String, Long> activeEntityCount = new HashMap<>();
if (classificationDefNames != null) { Map<String, Long> deletedEntityCount = new HashMap<>();
for (String classificationDefName : classificationDefNames) { Map<String, Long> taggedEntityCount = new HashMap<>();
activeCountMap.put(classificationDefName, getTypeCount(classificationDefName, ACTIVE)); long unusedTypeCount = 0;
} long totalEntities = 0;
}
// metrics for entities
Collection<String> entityDefNames = typeRegistry.getAllEntityDefNames();
if (entityDefNames != null) { if (entityDefNames != null) {
for (String entityDefName : entityDefNames) { for (String entityDefName : entityDefNames) {
activeCountMap.put(entityDefName, getTypeCount(entityDefName, ACTIVE)); long activeCount = getTypeCount(entityDefName, ACTIVE);
deletedCountMap.put(entityDefName, getTypeCount(entityDefName, DELETED)); long deletedCount = getTypeCount(entityDefName, DELETED);
if (activeCount > 0) {
activeEntityCount.put(entityDefName, activeCount);
totalEntities += activeCount;
}
if (deletedCount > 0) {
deletedEntityCount.put(entityDefName, deletedCount);
totalEntities += deletedCount;
}
if (activeCount == 0 && deletedCount == 0) {
unusedTypeCount++;
}
} }
} }
Map<String, Long> activeEntityCount = new HashMap<>(); if (classificationDefNames != null) {
Map<String, Long> deletedEntityCount = new HashMap<>(); for (String classificationDefName : classificationDefNames) {
long unusedTypeCount = 0; long count = getTypeCount(classificationDefName, ACTIVE);
long totalEntities = 0;
for (String entityDefName : typeRegistry.getAllEntityDefNames()) {
Long activeCount = activeCountMap.get(entityDefName);
Long deletedCount = deletedCountMap.get(entityDefName);
if (activeCount > 0) {
activeEntityCount.put(entityDefName, activeCount);
totalEntities += activeCount.longValue();
}
if (deletedCount > 0) {
deletedEntityCount.put(entityDefName, deletedCount);
totalEntities += deletedCount.longValue();
}
if (activeCount == 0 && deletedCount == 0) { if (count > 0) {
unusedTypeCount++; taggedEntityCount.put(classificationDefName, count);
}
} }
} }
AtlasMetrics metrics = new AtlasMetrics();
metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, System.currentTimeMillis());
metrics.addMetric(GENERAL, METRIC_STATS, metricsUtil.getStats()); //add atlas server stats
metrics.addMetric(GENERAL, METRIC_TYPE_COUNT, getAllTypesCount());
metrics.addMetric(GENERAL, METRIC_TAG_COUNT, getAllTagsCount());
metrics.addMetric(GENERAL, METRIC_TYPE_UNUSED_COUNT, unusedTypeCount); metrics.addMetric(GENERAL, METRIC_TYPE_UNUSED_COUNT, unusedTypeCount);
metrics.addMetric(GENERAL, METRIC_ENTITY_COUNT, totalEntities); metrics.addMetric(GENERAL, METRIC_ENTITY_COUNT, totalEntities);
metrics.addMetric(ENTITY, METRIC_ENTITY_ACTIVE, activeEntityCount); metrics.addMetric(ENTITY, METRIC_ENTITY_ACTIVE, activeEntityCount);
metrics.addMetric(ENTITY, METRIC_ENTITY_DELETED, deletedEntityCount); metrics.addMetric(ENTITY, METRIC_ENTITY_DELETED, deletedEntityCount);
Map<String, Long> taggedEntityCount = new HashMap<>();
for (String classificationName : typeRegistry.getAllClassificationDefNames()) {
Long count = activeCountMap.get(classificationName);
if (count > 0) {
taggedEntityCount.put(classificationName, count);
}
}
metrics.addMetric(TAG, METRIC_ENTITIES_PER_TAG, taggedEntityCount); metrics.addMetric(TAG, METRIC_ENTITIES_PER_TAG, taggedEntityCount);
// Miscellaneous metrics
long collectionTime = System.currentTimeMillis();
metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, collectionTime);
//add atlas server stats
Map<String, Object> statistics = statisticsUtil.getAtlasStatistics();
metrics.addMetric(GENERAL, "stats", statistics);
return metrics; return metrics;
} }
private Long getTypeCount(String typeName, Status status) { private long getTypeCount(String typeName, Status status) {
Long ret = null;
String indexQuery = indexSearchPrefix + "\"" + ENTITY_TYPE_PROPERTY_KEY + "\" : (%s)" + AND_STR + String indexQuery = indexSearchPrefix + "\"" + ENTITY_TYPE_PROPERTY_KEY + "\" : (%s)" + AND_STR +
indexSearchPrefix + "\"" + STATE_PROPERTY_KEY + "\" : (%s)"; indexSearchPrefix + "\"" + STATE_PROPERTY_KEY + "\" : (%s)";
indexQuery = String.format(indexQuery, typeName, status.name()); indexQuery = String.format(indexQuery, typeName, status.name());
try { try {
return atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals(); ret = atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals();
}catch (Exception e){ }catch (Exception e){
LOG.error("Failed fetching using indexQuery: " + e.getMessage()); LOG.error("Failed fetching using indexQuery: " + e.getMessage());
} }
return 0l;
return ret == null ? 0L : ret;
} }
private int getAllTypesCount() { private int getAllTypesCount() {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.util;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
public class AtlasMetricsCounter {
public enum Period { ALL, CURR_DAY, CURR_HOUR, PREV_HOUR, PREV_DAY };
private final String name;
private final Stats stats;
private Clock clock;
private Instant lastIncrTime;
private Instant dayStartTime;
private Instant dayEndTime;
private Instant hourStartTime;
private Instant hourEndTime;
public AtlasMetricsCounter(String name) {
this(name, Clock.systemUTC());
}
public AtlasMetricsCounter(String name, Clock clock) {
this.name = name;
this.stats = new Stats();
init(clock);
}
public String getName() { return name; }
public Instant getLastIncrTime() { return lastIncrTime; }
public void incr() {
incrByWithMeasure(1, 0);
}
public void incrBy(long count) {
incrByWithMeasure(count, 0);
}
public void incrWithMeasure(long measure) {
incrByWithMeasure(1, measure);
}
public void incrByWithMeasure(long count, long measure) {
Instant instant = clock.instant();
stats.addCount(ALL, count);
stats.addMeasure(ALL, measure);
if (instant.isAfter(dayStartTime)) { // ignore times earlier than start of current day
lastIncrTime = instant;
updateForTime(instant);
stats.addCount(CURR_DAY, count);
stats.addMeasure(CURR_DAY, measure);
if (instant.isAfter(hourStartTime)) { // ignore times earlier than start of current hour
stats.addCount(CURR_HOUR, count);
stats.addMeasure(CURR_HOUR, measure);
}
}
}
public Stats report() {
updateForTime(clock.instant());
return new Stats(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli());
}
// visible only for testing
void init(Clock clock) {
this.clock = clock;
this.lastIncrTime = Instant.ofEpochSecond(0);
this.dayStartTime = Instant.ofEpochSecond(0);
this.dayEndTime = Instant.ofEpochSecond(0);
this.hourStartTime = Instant.ofEpochSecond(0);
this.hourEndTime = Instant.ofEpochSecond(0);
updateForTime(clock.instant());
}
protected void updateForTime(Instant instant) {
if (instant.isAfter(dayEndTime)) {
rolloverDay(instant);
rolloverHour(instant);
} else if (instant.isAfter(hourEndTime)) {
rolloverHour(instant);
}
}
protected void rolloverDay(Instant instant) {
Instant dayStartTime = getDayStartTime(instant);
if (dayStartTime.equals(dayEndTime)) {
stats.copy(CURR_DAY, PREV_DAY);
} else {
stats.reset(PREV_DAY);
}
stats.reset(CURR_DAY);
this.dayStartTime = dayStartTime;
this.dayEndTime = getNextDayStartTime(instant);
}
protected void rolloverHour(Instant instant) {
Instant hourStartTime = getHourStartTime(instant);
if (hourStartTime.equals(hourEndTime)) {
stats.copy(CURR_HOUR, PREV_HOUR);
} else {
stats.reset(PREV_HOUR);
}
stats.reset(CURR_HOUR);
this.hourStartTime = hourStartTime;
this.hourEndTime = getNextHourStartTime(instant);
}
public static LocalDateTime getLocalDateTime(Instant instant) {
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
}
public static Instant getHourStartTime(Instant instant) {
LocalDateTime time = getLocalDateTime(instant);
return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).plusHours(time.getHour()).toInstant(ZoneOffset.UTC);
}
public static Instant getNextHourStartTime(Instant instant) {
LocalDateTime time = getLocalDateTime(instant);
return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).plusHours(time.getHour() + 1).toInstant(ZoneOffset.UTC);
}
public static Instant getDayStartTime(Instant instant) {
LocalDateTime time = getLocalDateTime(instant);
return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).toInstant(ZoneOffset.UTC);
}
public static Instant getNextDayStartTime(Instant instant) {
LocalDateTime time = getLocalDateTime(instant);
return LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.MIN).toInstant(ZoneOffset.UTC);
}
public static class Stats {
private static final int NUM_PERIOD = Period.values().length;
private final long dayStartTimeMs;
private final long hourStartTimeMs;
private final long[] count = new long[NUM_PERIOD];
private final long[] measureSum = new long[NUM_PERIOD];
private final long[] measureMin = new long[NUM_PERIOD];
private final long[] measureMax = new long[NUM_PERIOD];
public Stats() {
dayStartTimeMs = 0;
hourStartTimeMs = 0;
for (Period period : Period.values()) {
reset(period);
}
}
public Stats(Stats other, long dayStartTimeMs, long hourStartTimeMs) {
this.dayStartTimeMs = dayStartTimeMs;
this.hourStartTimeMs = hourStartTimeMs;
copy(other.count, this.count);
copy(other.measureSum, this.measureSum);
copy(other.measureMin, this.measureMin);
copy(other.measureMax, this.measureMax);
}
public long getDayStartTimeMs() { return dayStartTimeMs; }
public long getHourStartTimeMs() { return hourStartTimeMs; }
public long getCount(Period period) { return count[period.ordinal()]; }
public long getMeasureSum(Period period) { return measureSum[period.ordinal()]; }
public long getMeasureMin(Period period) { return measureMin[period.ordinal()]; }
public long getMeasureMax(Period period) { return measureMax[period.ordinal()]; }
public long getMeasureAvg(Period period) {
int idx = period.ordinal();
long c = count[idx];
return c != 0 ? (measureSum[idx] / c) : 0;
}
public void addCount(Period period, long num) {
count[period.ordinal()] += num;
}
public void addMeasure(Period period, long measure) {
int idx = period.ordinal();
measureSum[idx] += measure;
if (measureMin[idx] > measure) {
measureMin[idx] = measure;
}
if (measureMax[idx] < measure) {
measureMax[idx] = measure;
}
}
private void copy(Period src, Period dest) {
int srcIdx = src.ordinal();
int destIdx = dest.ordinal();
count[destIdx] = count[srcIdx];
measureSum[destIdx] = measureSum[srcIdx];
measureMin[destIdx] = measureMin[srcIdx];
measureMax[destIdx] = measureMax[srcIdx];
}
private void reset(Period period) {
int idx = period.ordinal();
count[idx] = 0;
measureSum[idx] = 0;
measureMin[idx] = Long.MAX_VALUE;
measureMax[idx] = Long.MIN_VALUE;
}
private void copy(long[] src, long[] dest) {
for (int i = 0; i < dest.length; i++) {
dest[i] = src[i];
}
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.util;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.util.AtlasMetricsCounter.Stats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.time.Clock;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import static org.apache.atlas.model.metrics.AtlasMetrics.*;
import static org.apache.atlas.repository.Constants.TYPE_NAME_INTERNAL;
import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
@Component
public class AtlasMetricsUtil {
private static final Logger LOG = LoggerFactory.getLogger(AtlasMetricsUtil.class);
private static final long SEC_MS = 1000;
private static final long MIN_MS = 60 * SEC_MS;
private static final long HOUR_MS = 60 * MIN_MS;
private static final long DAY_MS = 24 * HOUR_MS;
private static final String STATUS_CONNECTED = "connected";
private static final String STATUS_NOT_CONNECTED = "not-connected";
private final AtlasGraph graph;
private long serverStartTime = 0;
private long serverActiveTime = 0;
private long msgOffsetStart = -1;
private long msgOffsetCurrent = 0;
private final AtlasMetricsCounter messagesProcessed = new AtlasMetricsCounter("messagesProcessed");
private final AtlasMetricsCounter messagesFailed = new AtlasMetricsCounter("messagesFailed");
private final AtlasMetricsCounter entityCreates = new AtlasMetricsCounter("entityCreates");
private final AtlasMetricsCounter entityUpdates = new AtlasMetricsCounter("entityUpdates");
private final AtlasMetricsCounter entityDeletes = new AtlasMetricsCounter("entityDeletes");
@Inject
public AtlasMetricsUtil(AtlasGraph graph) {
this.graph = graph;
}
// visible only for testing
public void init(Clock clock) {
messagesProcessed.init(clock);
messagesFailed.init(clock);
entityCreates.init(clock);
entityUpdates.init(clock);
entityDeletes.init(clock);
}
public void onServerStart() {
serverStartTime = System.currentTimeMillis();
}
public void onServerActivation() {
serverActiveTime = System.currentTimeMillis();
}
public void onNotificationProcessingComplete(long msgOffset, NotificationStat stats) {
messagesProcessed.incrWithMeasure(stats.timeTakenMs);
entityCreates.incrBy(stats.entityCreates);
entityUpdates.incrBy(stats.entityUpdates);
entityDeletes.incrBy(stats.entityDeletes);
if (stats.isFailedMsg) {
messagesFailed.incr();
}
if (msgOffsetStart == -1) {
msgOffsetStart = msgOffset;
}
msgOffsetCurrent = ++msgOffset;
}
public Map<String, Object> getStats() {
Map<String, Object> ret = new HashMap<>();
Stats messagesProcessed = this.messagesProcessed.report();
Stats messagesFailed = this.messagesFailed.report();
Stats entityCreates = this.entityCreates.report();
Stats entityUpdates = this.entityUpdates.report();
Stats entityDeletes = this.entityDeletes.report();
ret.put(STAT_SERVER_START_TIMESTAMP, serverStartTime);
ret.put(STAT_SERVER_ACTIVE_TIMESTAMP, serverActiveTime);
ret.put(STAT_SERVER_UP_TIME, millisToTimeDiff(System.currentTimeMillis() - serverStartTime));
ret.put(STAT_CONNECTION_STATUS_BACKEND_STORE, getHBaseStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
ret.put(STAT_CONNECTION_STATUS_INDEX_STORE, getSolrStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
ret.put(STAT_NOTIFY_START_OFFSET, msgOffsetStart);
ret.put(STAT_NOTIFY_CURRENT_OFFSET, msgOffsetCurrent);
ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME, this.messagesProcessed.getLastIncrTime().toEpochMilli());
ret.put(STAT_NOTIFY_COUNT_TOTAL, messagesProcessed.getCount(ALL));
ret.put(STAT_NOTIFY_AVG_TIME_TOTAL, messagesProcessed.getMeasureAvg(ALL));
ret.put(STAT_NOTIFY_FAILED_COUNT_TOTAL, messagesFailed.getCount(ALL));
ret.put(STAT_NOTIFY_CREATES_COUNT_TOTAL, entityCreates.getCount(ALL));
ret.put(STAT_NOTIFY_UPDATES_COUNT_TOTAL, entityUpdates.getCount(ALL));
ret.put(STAT_NOTIFY_DELETES_COUNT_TOTAL, entityDeletes.getCount(ALL));
ret.put(STAT_NOTIFY_START_TIME_CURR_DAY, messagesProcessed.getDayStartTimeMs());
ret.put(STAT_NOTIFY_COUNT_CURR_DAY, messagesProcessed.getCount(CURR_DAY));
ret.put(STAT_NOTIFY_AVG_TIME_CURR_DAY, messagesProcessed.getMeasureAvg(CURR_DAY));
ret.put(STAT_NOTIFY_FAILED_COUNT_CURR_DAY, messagesFailed.getCount(CURR_DAY));
ret.put(STAT_NOTIFY_CREATES_COUNT_CURR_DAY, entityCreates.getCount(CURR_DAY));
ret.put(STAT_NOTIFY_UPDATES_COUNT_CURR_DAY, entityUpdates.getCount(CURR_DAY));
ret.put(STAT_NOTIFY_DELETES_COUNT_CURR_DAY, entityDeletes.getCount(CURR_DAY));
ret.put(STAT_NOTIFY_START_TIME_CURR_HOUR, messagesProcessed.getHourStartTimeMs());
ret.put(STAT_NOTIFY_COUNT_CURR_HOUR, messagesProcessed.getCount(CURR_HOUR));
ret.put(STAT_NOTIFY_AVG_TIME_CURR_HOUR, messagesProcessed.getMeasureAvg(CURR_HOUR));
ret.put(STAT_NOTIFY_FAILED_COUNT_CURR_HOUR, messagesFailed.getCount(CURR_HOUR));
ret.put(STAT_NOTIFY_CREATES_COUNT_CURR_HOUR, entityCreates.getCount(CURR_HOUR));
ret.put(STAT_NOTIFY_UPDATES_COUNT_CURR_HOUR, entityUpdates.getCount(CURR_HOUR));
ret.put(STAT_NOTIFY_DELETES_COUNT_CURR_HOUR, entityDeletes.getCount(CURR_HOUR));
ret.put(STAT_NOTIFY_COUNT_PREV_HOUR, messagesProcessed.getCount(PREV_HOUR));
ret.put(STAT_NOTIFY_AVG_TIME_PREV_HOUR, messagesProcessed.getMeasureAvg(PREV_HOUR));
ret.put(STAT_NOTIFY_FAILED_COUNT_PREV_HOUR, messagesFailed.getCount(PREV_HOUR));
ret.put(STAT_NOTIFY_CREATES_COUNT_PREV_HOUR, entityCreates.getCount(PREV_HOUR));
ret.put(STAT_NOTIFY_UPDATES_COUNT_PREV_HOUR, entityUpdates.getCount(PREV_HOUR));
ret.put(STAT_NOTIFY_DELETES_COUNT_PREV_HOUR, entityDeletes.getCount(PREV_HOUR));
ret.put(STAT_NOTIFY_COUNT_PREV_DAY, messagesProcessed.getCount(PREV_DAY));
ret.put(STAT_NOTIFY_AVG_TIME_PREV_DAY, messagesProcessed.getMeasureAvg(PREV_DAY));
ret.put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY, messagesFailed.getCount(PREV_DAY));
ret.put(STAT_NOTIFY_CREATES_COUNT_PREV_DAY, entityCreates.getCount(PREV_DAY));
ret.put(STAT_NOTIFY_UPDATES_COUNT_PREV_DAY, entityUpdates.getCount(PREV_DAY));
ret.put(STAT_NOTIFY_DELETES_COUNT_PREV_DAY, entityDeletes.getCount(PREV_DAY));
return ret;
}
private boolean getHBaseStatus(){
try {
runWithTimeout(new Runnable() {
@Override
public void run() {
graph.query().has(TYPE_NAME_PROPERTY_KEY, TYPE_NAME_INTERNAL).vertices(1);
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
return false;
}
return true;
}
private boolean getSolrStatus(){
final String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + Constants.TYPE_NAME_PROPERTY_KEY + "\":(" + TYPE_NAME_INTERNAL + ")";
try {
runWithTimeout(new Runnable() {
@Override
public void run() {
graph.indexQuery(Constants.VERTEX_INDEX, query).vertices(0, 1);
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
return false;
}
return true;
}
private void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception {
runWithTimeout(new Callable<Object>() {
@Override
public Object call() {
runnable.run();
return null;
}
}, timeout, timeUnit);
}
private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Future<T> future = executor.submit(callable);
executor.shutdown();
try {
return future.get(timeout, timeUnit);
} catch (TimeoutException e) {
future.cancel(true);
throw e;
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof Error) {
throw (Error) t;
} else if (t instanceof Exception) {
throw (Exception) t;
} else {
throw new IllegalStateException(t);
}
}
}
private String millisToTimeDiff(long msDiff) {
StringBuilder sb = new StringBuilder();
long diffSeconds = msDiff / SEC_MS % 60;
long diffMinutes = msDiff / MIN_MS % 60;
long diffHours = msDiff / HOUR_MS % 24;
long diffDays = msDiff / DAY_MS;
if (diffDays > 0) sb.append(diffDays).append(" day ");
if (diffHours > 0) sb.append(diffHours).append(" hour ");
if (diffMinutes > 0) sb.append(diffMinutes).append(" min ");
if (diffSeconds > 0) sb.append(diffSeconds).append(" sec");
return sb.toString();
}
public static class NotificationStat {
public boolean isFailedMsg = false;
public long timeTakenMs = 0;
public int entityCreates = 0;
public int entityUpdates = 0;
public int entityDeletes = 0;
public NotificationStat() { }
public NotificationStat(boolean isFailedMsg, long timeTakenMs) {
this.isFailedMsg = isFailedMsg;
this.timeTakenMs = timeTakenMs;
}
public void updateStats(EntityMutationResponse response) {
entityCreates += getSize(response.getCreatedEntities());
entityUpdates += getSize(response.getUpdatedEntities());
entityUpdates += getSize(response.getPartialUpdatedEntities());
entityDeletes += getSize(response.getDeletedEntities());
}
private int getSize(Collection collection) {
return collection != null ? collection.size() : 0;
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.util;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.AtlasStatistics;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.text.NumberFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Locale;
import java.util.concurrent.*;
import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_START_TS;
import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_ACTIVE_TS;
import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_UP_SINCE;
import static org.apache.atlas.model.AtlasStatistics.STAT_START_OFFSET;
import static org.apache.atlas.model.AtlasStatistics.STAT_CURRENT_OFFSET;
import static org.apache.atlas.model.AtlasStatistics.STAT_SOLR_STATUS;
import static org.apache.atlas.model.AtlasStatistics.STAT_HBASE_STATUS;
import static org.apache.atlas.model.AtlasStatistics.STAT_LAST_MESSAGE_PROCESSED_TIME_TS;
import static org.apache.atlas.model.AtlasStatistics.STAT_AVG_MESSAGE_PROCESSING_TIME;
import static org.apache.atlas.model.AtlasStatistics.STAT_MESSAGES_CONSUMED;
@Component
public class StatisticsUtil {
private static final Logger LOG = LoggerFactory.getLogger(StatisticsUtil.class);
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("d MMM, yyyy : hh:mm aaa z");
private static final long DAY = 1000 * 60 * 60 * 24;
private static final long HOUR = 1000 * 60 * 60;
private static final long MIN = 1000 * 60;
private static final long SEC = 1000;
private final AtlasGraph graph;
private final String STATUS_CONNECTED = "connected";
private final String STATUS_NOT_CONNECTED = "not-connected";
private final AtlasStatistics atlasStatistics;
private long countMsgProcessed = 0;
private long totalMsgProcessingTimeMs = 0;
private Locale locale = new Locale("en", "US");
private NumberFormat numberFormat;
@Inject
public StatisticsUtil(AtlasGraph graph) {
this.graph = graph;
this.atlasStatistics = new AtlasStatistics();
numberFormat = NumberFormat.getInstance(locale);
}
public Map<String, Object> getAtlasStatistics() {
Map<String, Object> statisticsMap = new HashMap<>();
statisticsMap.putAll(atlasStatistics.getData());
statisticsMap.put(STAT_HBASE_STATUS, getHBaseStatus());
statisticsMap.put(STAT_SOLR_STATUS, getSolrStatus());
statisticsMap.put(STAT_SERVER_UP_SINCE, getUpSinceTime());
if(countMsgProcessed > 0) {
statisticsMap.put(STAT_MESSAGES_CONSUMED, countMsgProcessed);
}
formatStatistics(statisticsMap);
return statisticsMap;
}
public void setKafkaOffsets(long value) {
if (Long.parseLong(getStat(STAT_START_OFFSET).toString()) == -1) {
addStat(STAT_START_OFFSET, value);
}
addStat(STAT_CURRENT_OFFSET, ++value);
}
public void setAvgMsgProcessingTime(long value) {
countMsgProcessed++;
totalMsgProcessingTimeMs += value;
value = totalMsgProcessingTimeMs / countMsgProcessed;
addStat(STAT_AVG_MESSAGE_PROCESSING_TIME, value);
}
public void setLastMsgProcessedTime() {
addStat(STAT_LAST_MESSAGE_PROCESSED_TIME_TS, System.currentTimeMillis());
}
public void setServerStartTime() {
addStat(STAT_SERVER_START_TS, System.currentTimeMillis());
}
public void setServerActiveTime() {
addStat(STAT_SERVER_ACTIVE_TS, System.currentTimeMillis());
}
private void addStat(String key, Object value) {
Map<String, Object> data = atlasStatistics.getData();
if (data == null) {
data = new HashMap<>();
}
data.put(key, value);
atlasStatistics.setData(data);
}
private Object getStat(String key) {
Map<String, Object> data = atlasStatistics.getData();
Object ret = data.get(key);
if (ret == null) {
return -1;
}
return ret;
}
private void formatStatistics(Map<String, Object> statisticsMap) {
for (Map.Entry<String, Object> stat : statisticsMap.entrySet()) {
switch (stat.getKey()) {
case STAT_SERVER_UP_SINCE:
statisticsMap.put(stat.getKey(), millisToTimeDiff(Long.parseLong(stat.getValue().toString())));
break;
case STAT_LAST_MESSAGE_PROCESSED_TIME_TS:
statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
break;
case STAT_SERVER_START_TS:
case STAT_SERVER_ACTIVE_TS:
statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
break;
case STAT_AVG_MESSAGE_PROCESSING_TIME:
statisticsMap.put(stat.getKey(), formatNumber(Long.parseLong(stat.getValue().toString())) + " milliseconds");
break;
case STAT_HBASE_STATUS:
case STAT_SOLR_STATUS:
String curState = ((boolean) stat.getValue()) ? STATUS_CONNECTED : STATUS_NOT_CONNECTED;
statisticsMap.put(stat.getKey(), curState);
break;
case STAT_MESSAGES_CONSUMED:
case STAT_START_OFFSET:
case STAT_CURRENT_OFFSET:
statisticsMap.put(stat.getKey(), formatNumber(Long.parseLong(stat.getValue().toString())));
break;
default:
statisticsMap.put(stat.getKey(), stat.getValue());
}
}
}
private boolean getHBaseStatus() {
String query = "g.V().next()";
try {
runWithTimeout(new Runnable() {
@Override
public void run() {
try {
graph.executeGremlinScript(query, false);
} catch (AtlasBaseException e) {
LOG.error(e.getMessage());
}
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
return false;
}
return true;
}
private boolean getSolrStatus() {
String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + "__type.name\"" + " : (*)";
try {
runWithTimeout(new Runnable() {
@Override
public void run() {
graph.indexQuery(Constants.VERTEX_INDEX, query).vertexTotals();
}
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error(e.getMessage());
return false;
}
return true;
}
private void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception {
runWithTimeout(new Callable<Object>() {
@Override
public Object call() {
runnable.run();
return null;
}
}, timeout, timeUnit);
}
private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Future<T> future = executor.submit(callable);
executor.shutdown();
try {
return future.get(timeout, timeUnit);
} catch (TimeoutException e) {
future.cancel(true);
throw e;
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof Error) {
throw (Error) t;
} else if (t instanceof Exception) {
throw (Exception) t;
} else {
throw new IllegalStateException(t);
}
}
}
private long getUpSinceTime() {
long upTS = Long.parseLong(getStat(STAT_SERVER_START_TS).toString());
return System.currentTimeMillis() - upTS;
}
private String millisToTimeDiff(long msDiff) {
StringBuilder sb = new StringBuilder();
long diffSeconds = msDiff / SEC % 60;
long diffMinutes = msDiff / MIN % 60;
long diffHours = msDiff / HOUR % 24;
long diffDays = msDiff / DAY;
if (diffDays > 0) sb.append(diffDays).append(" day ");
if (diffHours > 0) sb.append(diffHours).append(" hour ");
if (diffMinutes > 0) sb.append(diffMinutes).append(" min ");
if (diffSeconds > 0) sb.append(diffSeconds).append(" sec");
return sb.toString();
}
private String millisToTimeStamp(long ms) {
return simpleDateFormat.format(ms);
}
private String formatNumber(long value) {
return numberFormat.format(value);
}
}
...@@ -28,6 +28,8 @@ import org.apache.atlas.repository.impexp.ZipSource; ...@@ -28,6 +28,8 @@ import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.runner.LocalSolrRunner; import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasMetricsCounter;
import org.apache.atlas.util.AtlasMetricsUtil;
import org.testng.SkipException; import org.testng.SkipException;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
...@@ -37,10 +39,15 @@ import org.testng.annotations.Test; ...@@ -37,10 +39,15 @@ import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr; import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.model.metrics.AtlasMetrics.*;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.services.MetricsService.ENTITY; import static org.apache.atlas.services.MetricsService.ENTITY;
...@@ -53,11 +60,11 @@ import static org.apache.atlas.services.MetricsService.METRIC_TAG_COUNT; ...@@ -53,11 +60,11 @@ import static org.apache.atlas.services.MetricsService.METRIC_TAG_COUNT;
import static org.apache.atlas.services.MetricsService.METRIC_TYPE_COUNT; import static org.apache.atlas.services.MetricsService.METRIC_TYPE_COUNT;
import static org.apache.atlas.services.MetricsService.METRIC_TYPE_UNUSED_COUNT; import static org.apache.atlas.services.MetricsService.METRIC_TYPE_UNUSED_COUNT;
import static org.apache.atlas.services.MetricsService.TAG; import static org.apache.atlas.services.MetricsService.TAG;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
@Guice(modules = TestModules.TestOnlyModule.class) @Guice(modules = TestModules.TestOnlyModule.class)
public class MetricsServiceTest { public class MetricsServiceTest {
public static final String IMPORT_FILE = "metrics-entities-data.zip"; public static final String IMPORT_FILE = "metrics-entities-data.zip";
@Inject @Inject
...@@ -72,6 +79,14 @@ public class MetricsServiceTest { ...@@ -72,6 +79,14 @@ public class MetricsServiceTest {
@Inject @Inject
private MetricsService metricsService; private MetricsService metricsService;
@Inject
private AtlasMetricsUtil metricsUtil;
TestClock clock = new TestClock(Clock.systemUTC(), ZoneOffset.UTC);
long msgOffset = 0;
private final Map<String, Long> activeEntityMetricsExpected = new HashMap<String, Long>() {{ private final Map<String, Long> activeEntityMetricsExpected = new HashMap<String, Long>() {{
put("hive_storagedesc", 5L); put("hive_storagedesc", 5L);
put("__ExportImportAuditEntry", 1L); put("__ExportImportAuditEntry", 1L);
...@@ -95,6 +110,17 @@ public class MetricsServiceTest { ...@@ -95,6 +110,17 @@ public class MetricsServiceTest {
put("PII", 1L); put("PII", 1L);
}}; }};
private final Map<String, Object> metricExpected = new HashMap<String, Object>() {{
put(STAT_NOTIFY_COUNT_CURR_HOUR, 11L);
put(STAT_NOTIFY_FAILED_COUNT_CURR_HOUR, 1L);
put(STAT_NOTIFY_COUNT_PREV_HOUR, 11L);
put(STAT_NOTIFY_FAILED_COUNT_PREV_HOUR, 1L);
put(STAT_NOTIFY_COUNT_CURR_DAY, 33L);
put(STAT_NOTIFY_FAILED_COUNT_CURR_DAY, 3L);
put(STAT_NOTIFY_COUNT_PREV_DAY, 11L);
put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY, 1L);
}};
@BeforeClass @BeforeClass
public void setup() { public void setup() {
RequestContext.clear(); RequestContext.clear();
...@@ -148,6 +174,24 @@ public class MetricsServiceTest { ...@@ -148,6 +174,24 @@ public class MetricsServiceTest {
assertEquals(deletedEntityMetricsActual, deletedEntityMetricsExpected); assertEquals(deletedEntityMetricsActual, deletedEntityMetricsExpected);
} }
@Test
public void testNotificationMetrics() {
Instant now = Clock.systemUTC().instant();
Instant dayStartTime = AtlasMetricsCounter.getDayStartTime(now);
Instant dayEndTime = AtlasMetricsCounter.getNextDayStartTime(now);
Instant hourStartTime = dayEndTime.minusSeconds(60 * 60);
prepareNotificationData(dayStartTime, hourStartTime);
clock.setInstant(dayEndTime.minusSeconds(1));
Map<String, Object> notificationMetricMap = metricsUtil.getStats();
clock.setInstant(null);
verifyNotificationMetric(metricExpected, notificationMetricMap);
}
private void loadModelFilesAndImportTestData() { private void loadModelFilesAndImportTestData() {
try { try {
...@@ -165,8 +209,75 @@ public class MetricsServiceTest { ...@@ -165,8 +209,75 @@ public class MetricsServiceTest {
} }
} }
private void prepareNotificationData(Instant dayStartTime, Instant hourStartTime) {
Instant prevDayStartTime = AtlasMetricsCounter.getDayStartTime(dayStartTime.minusSeconds(1));
msgOffset = 0;
clock.setInstant(prevDayStartTime);
metricsUtil.init(clock);
clock.setInstant(null);
processMessage(prevDayStartTime.plusSeconds(3)); // yesterday
processMessage(dayStartTime.plusSeconds(3)); // today
processMessage(hourStartTime.minusSeconds(3)); // past hour
processMessage(hourStartTime.plusSeconds(3)); // this hour
}
private void processMessage(Instant instant) {
clock.setInstant(instant);
metricsUtil.onNotificationProcessingComplete(++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1));
for (int i = 0; i < 10; i++) {
metricsUtil.onNotificationProcessingComplete(msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1));
}
clock.setInstant(null);
}
private void verifyNotificationMetric(Map<String, Object> metricExpected, Map<String, Object> notificationMetrics) {
assertNotNull(notificationMetrics);
assertNotEquals(notificationMetrics.size(), 0);
assertTrue(notificationMetrics.size() >= metricExpected.size());
for (Map.Entry<String, Object> entry : metricExpected.entrySet()) {
assertEquals(notificationMetrics.get(entry.getKey()), entry.getValue(), entry.getKey());
}
}
public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException { public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException {
FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName); FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
return new ZipSource(fs); return new ZipSource(fs);
} }
private static class TestClock extends Clock {
private final Clock baseClock;
private final ZoneId zone;
private Instant instant = null;
public TestClock(Clock baseClock, ZoneId zone) {
this.baseClock = baseClock;
this.zone = zone;
}
@Override
public ZoneId getZone() {
return zone;
}
@Override
public TestClock withZone(ZoneId zone) {
return new TestClock(baseClock, zone);
}
@Override
public Instant instant() {
return instant != null ? instant : baseClock.instant();
}
public void setInstant(Instant instant) {
this.instant = instant;
}
}
} }
\ No newline at end of file
...@@ -47,7 +47,8 @@ import org.apache.atlas.notification.preprocessor.EntityPreprocessor; ...@@ -47,7 +47,8 @@ import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext; import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.atlas.utils.LruCache; import org.apache.atlas.utils.LruCache;
import org.apache.atlas.util.StatisticsUtil; import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.atlas.util.AtlasMetricsUtil.NotificationStat;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
...@@ -140,7 +141,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -140,7 +141,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final ServiceState serviceState; private final ServiceState serviceState;
private final AtlasInstanceConverter instanceConverter; private final AtlasInstanceConverter instanceConverter;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final StatisticsUtil statisticsUtil; private final AtlasMetricsUtil metricsUtil;
private final int maxRetries; private final int maxRetries;
private final int failedMsgCacheSize; private final int failedMsgCacheSize;
private final int minWaitDuration; private final int minWaitDuration;
...@@ -156,10 +157,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -156,10 +157,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final boolean hiveTypesRemoveOwnedRefAttrs; private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs; private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled; private final boolean preprocessEnabled;
private final NotificationInterface notificationInterface;
private NotificationInterface notificationInterface; private final Configuration applicationProperties;
private ExecutorService executors; private ExecutorService executors;
private Configuration applicationProperties;
@VisibleForTesting @VisibleForTesting
final int consumerRetryInterval; final int consumerRetryInterval;
...@@ -170,14 +170,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -170,14 +170,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Inject @Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
ServiceState serviceState, AtlasInstanceConverter instanceConverter, ServiceState serviceState, AtlasInstanceConverter instanceConverter,
AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) throws AtlasException { AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) throws AtlasException {
this.notificationInterface = notificationInterface; this.notificationInterface = notificationInterface;
this.atlasEntityStore = atlasEntityStore; this.atlasEntityStore = atlasEntityStore;
this.serviceState = serviceState; this.serviceState = serviceState;
this.instanceConverter = instanceConverter; this.instanceConverter = instanceConverter;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.applicationProperties = ApplicationProperties.get(); this.applicationProperties = ApplicationProperties.get();
this.statisticsUtil = statisticsUtil; this.metricsUtil = metricsUtil;
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1); failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
...@@ -475,12 +475,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -475,12 +475,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@VisibleForTesting @VisibleForTesting
void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException { void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException {
AtlasPerfTracer perf = null; AtlasPerfTracer perf = null;
HookNotification message = kafkaMsg.getMessage(); HookNotification message = kafkaMsg.getMessage();
String messageUser = message.getUser(); String messageUser = message.getUser();
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
boolean isFailedMsg = false; NotificationStat stats = new NotificationStat();
AuditLog auditLog = null; AuditLog auditLog = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name()); perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
...@@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath()); AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
} }
createOrUpdate(entities, false, context); createOrUpdate(entities, false, stats, context);
} }
break; break;
...@@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
// There should only be one root entity // There should only be one root entity
entities.getEntities().get(0).setGuid(guid); entities.getEntities().get(0).setGuid(guid);
createOrUpdate(entities, true, context); createOrUpdate(entities, true, stats, context);
} }
break; break;
...@@ -562,7 +562,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -562,7 +562,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try { try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue())); EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
stats.updateStats(response);
} catch (ClassCastException cle) { } catch (ClassCastException cle) {
LOG.error("Failed to delete entity {}", deleteRequest); LOG.error("Failed to delete entity {}", deleteRequest);
} }
...@@ -579,7 +581,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -579,7 +581,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
} }
createOrUpdate(entities, false, context); createOrUpdate(entities, false, stats, context);
} }
break; break;
...@@ -593,7 +595,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -593,7 +595,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath()); AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
} }
createOrUpdate(entities, false, context); createOrUpdate(entities, false, stats, context);
} }
break; break;
...@@ -608,7 +610,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -608,7 +610,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
} }
atlasEntityStore.updateEntity(entityId, entity, true); EntityMutationResponse response = atlasEntityStore.updateEntity(entityId, entity, true);
stats.updateStats(response);
} }
break; break;
...@@ -622,7 +626,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -622,7 +626,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
} }
createOrUpdate(entities, false, context); createOrUpdate(entities, false, stats, context);
} }
break; break;
...@@ -640,7 +644,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -640,7 +644,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes()); EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
stats.updateStats(response);
} }
} catch (ClassCastException cle) { } catch (ClassCastException cle) {
LOG.error("Failed to do delete entities {}", entities); LOG.error("Failed to do delete entities {}", entities);
...@@ -661,7 +667,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -661,7 +667,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.warn("Max retries exceeded for message {}", strMessage, e); LOG.warn("Max retries exceeded for message {}", strMessage, e);
isFailedMsg = true; stats.isFailedMsg = true;
failedMessages.add(strMessage); failedMessages.add(strMessage);
...@@ -689,33 +695,34 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -689,33 +695,34 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} finally { } finally {
AtlasPerfTracer.log(perf); AtlasPerfTracer.log(perf);
long msgProcessingTime = System.currentTimeMillis() - startTime; stats.timeTakenMs = System.currentTimeMillis() - startTime;
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getOffset(), stats);
if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) { if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) {
String strMessage = AbstractNotification.getMessageJson(message); String strMessage = AbstractNotification.getMessageJson(message);
LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset()); LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage); LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
} }
if (auditLog != null) { if (auditLog != null) {
auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK); auditLog.setHttpStatus(stats.isFailedMsg ? SC_BAD_REQUEST : SC_OK);
auditLog.setTimeTaken(msgProcessingTime); auditLog.setTimeTaken(stats.timeTakenMs);
AuditFilter.audit(auditLog); AuditFilter.audit(auditLog);
} }
statisticsUtil.setAvgMsgProcessingTime(msgProcessingTime);
} }
} }
private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, PreprocessorContext context) throws AtlasBaseException { private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, NotificationStat stats, PreprocessorContext context) throws AtlasBaseException {
List<AtlasEntity> entitiesList = entities.getEntities(); List<AtlasEntity> entitiesList = entities.getEntities();
AtlasEntityStream entityStream = new AtlasEntityStream(entities); AtlasEntityStream entityStream = new AtlasEntityStream(entities);
if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) { if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
EntityMutationResponse response = atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate); EntityMutationResponse response = atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
recordProcessedEntities(response, context); recordProcessedEntities(response, stats, context);
} else { } else {
for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) { for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
int toIndex = fromIdx + commitBatchSize; int toIndex = fromIdx + commitBatchSize;
...@@ -733,7 +740,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -733,7 +740,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate); EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
recordProcessedEntities(response, context); recordProcessedEntities(response, stats, context);
RequestContext.get().resetEntityGuidUpdates(); RequestContext.get().resetEntityGuidUpdates();
...@@ -770,8 +777,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -770,8 +777,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumer.commit(partition, kafkaMessage.getOffset() + 1); consumer.commit(partition, kafkaMessage.getOffset() + 1);
commitSucceessStatus = true; commitSucceessStatus = true;
statisticsUtil.setKafkaOffsets(kafkaMessage.getOffset());
statisticsUtil.setLastMsgProcessedTime();
} finally { } finally {
failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
} }
...@@ -1021,24 +1026,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -1021,24 +1026,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return ret; return ret;
} }
private void recordProcessedEntities(EntityMutationResponse mutationResponse, PreprocessorContext context) { private void recordProcessedEntities(EntityMutationResponse mutationResponse, NotificationStat stats, PreprocessorContext context) {
if (mutationResponse != null && context != null) { if (mutationResponse != null) {
if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) { if (stats != null) {
context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments()); stats.updateStats(mutationResponse);
} }
if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) { if (context != null) {
for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) { if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
if (entity != null && entity.getGuid() != null) { context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
context.getCreatedEntities().add(entity.getGuid()); }
if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) {
for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) {
if (entity != null && entity.getGuid() != null) {
context.getCreatedEntities().add(entity.getGuid());
}
} }
} }
}
if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) { if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) {
for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) { for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) {
if (entity != null && entity.getGuid() != null) { if (entity != null && entity.getGuid() != null) {
context.getDeletedEntities().add(entity.getGuid()); context.getDeletedEntities().add(entity.getGuid());
}
} }
} }
} }
......
...@@ -23,7 +23,7 @@ import org.apache.atlas.ha.AtlasServerIdSelector; ...@@ -23,7 +23,7 @@ import org.apache.atlas.ha.AtlasServerIdSelector;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.atlas.util.StatisticsUtil; import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
...@@ -55,18 +55,17 @@ import java.util.Set; ...@@ -55,18 +55,17 @@ import java.util.Set;
@Component @Component
@Order(1) @Order(1)
public class ActiveInstanceElectorService implements Service, LeaderLatchListener { public class ActiveInstanceElectorService implements Service, LeaderLatchListener {
private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceElectorService.class); private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceElectorService.class);
private final Configuration configuration; private final Configuration configuration;
private final ServiceState serviceState; private final ServiceState serviceState;
private final ActiveInstanceState activeInstanceState; private final ActiveInstanceState activeInstanceState;
private final StatisticsUtil statisticsUtil; private final AtlasMetricsUtil metricsUtil;
private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders; private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
private List<ActiveStateChangeHandler> activeStateChangeHandlers; private List<ActiveStateChangeHandler> activeStateChangeHandlers;
private CuratorFactory curatorFactory; private CuratorFactory curatorFactory;
private LeaderLatch leaderLatch; private LeaderLatch leaderLatch;
private String serverId; private String serverId;
/** /**
* Create a new instance of {@link ActiveInstanceElectorService} * Create a new instance of {@link ActiveInstanceElectorService}
...@@ -78,14 +77,14 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -78,14 +77,14 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
ActiveInstanceElectorService(Configuration configuration, ActiveInstanceElectorService(Configuration configuration,
Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders, Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders,
CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState, CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState,
ServiceState serviceState, StatisticsUtil statisticsUtil) { ServiceState serviceState, AtlasMetricsUtil metricsUtil) {
this.configuration = configuration; this.configuration = configuration;
this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders; this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders;
this.activeStateChangeHandlers = new ArrayList<>(); this.activeStateChangeHandlers = new ArrayList<>();
this.curatorFactory = curatorFactory; this.curatorFactory = curatorFactory;
this.activeInstanceState = activeInstanceState; this.activeInstanceState = activeInstanceState;
this.serviceState = serviceState; this.serviceState = serviceState;
this.statisticsUtil = statisticsUtil; this.metricsUtil = metricsUtil;
} }
/** /**
...@@ -96,9 +95,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -96,9 +95,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
*/ */
@Override @Override
public void start() throws AtlasException { public void start() throws AtlasException {
statisticsUtil.setServerStartTime(); metricsUtil.onServerStart();
if (!HAConfiguration.isHAEnabled(configuration)) { if (!HAConfiguration.isHAEnabled(configuration)) {
statisticsUtil.setServerActiveTime(); metricsUtil.onServerActivation();
LOG.info("HA is not enabled, no need to start leader election service"); LOG.info("HA is not enabled, no need to start leader election service");
return; return;
} }
...@@ -156,7 +155,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ...@@ -156,7 +155,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
} }
activeInstanceState.update(serverId); activeInstanceState.update(serverId);
serviceState.setActive(); serviceState.setActive();
statisticsUtil.setServerActiveTime(); metricsUtil.onServerActivation();
} catch (Exception e) { } catch (Exception e) {
LOG.error("Got exception while activating", e); LOG.error("Got exception while activating", e);
notLeader(); notLeader();
......
...@@ -25,7 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException; ...@@ -25,7 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.*; import org.apache.atlas.kafka.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.util.StatisticsUtil; import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1; import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
...@@ -82,7 +82,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -82,7 +82,7 @@ public class NotificationHookConsumerKafkaTest {
private AtlasTypeRegistry typeRegistry; private AtlasTypeRegistry typeRegistry;
@Mock @Mock
private StatisticsUtil statisticsUtil; private AtlasMetricsUtil metricsUtil;
@BeforeTest @BeforeTest
public void setup() throws AtlasException, InterruptedException, AtlasBaseException { public void setup() throws AtlasException, InterruptedException, AtlasBaseException {
...@@ -108,7 +108,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -108,7 +108,7 @@ public class NotificationHookConsumerKafkaTest {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false); NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
...@@ -127,7 +127,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -127,7 +127,7 @@ public class NotificationHookConsumerKafkaTest {
public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException { public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true); ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder; NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
...@@ -163,7 +163,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -163,7 +163,7 @@ public class NotificationHookConsumerKafkaTest {
assertNotNull (consumer); assertNotNull (consumer);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
......
...@@ -26,7 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; ...@@ -26,7 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType; import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.util.StatisticsUtil; import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
...@@ -77,7 +77,7 @@ public class NotificationHookConsumerTest { ...@@ -77,7 +77,7 @@ public class NotificationHookConsumerTest {
private AtlasTypeRegistry typeRegistry; private AtlasTypeRegistry typeRegistry;
@Mock @Mock
private StatisticsUtil statisticsUtil; private AtlasMetricsUtil metricsUtil;
@BeforeMethod @BeforeMethod
public void setup() throws AtlasBaseException { public void setup() throws AtlasBaseException {
...@@ -96,7 +96,7 @@ public class NotificationHookConsumerTest { ...@@ -96,7 +96,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerCanProceedIfServerIsReady() throws Exception { public void testConsumerCanProceedIfServerIsReady() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
...@@ -109,7 +109,7 @@ public class NotificationHookConsumerTest { ...@@ -109,7 +109,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
...@@ -126,7 +126,7 @@ public class NotificationHookConsumerTest { ...@@ -126,7 +126,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = mock(EntityCreateRequest.class); EntityCreateRequest message = mock(EntityCreateRequest.class);
...@@ -143,7 +143,7 @@ public class NotificationHookConsumerTest { ...@@ -143,7 +143,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException { public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class))); EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
...@@ -157,7 +157,7 @@ public class NotificationHookConsumerTest { ...@@ -157,7 +157,7 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
...@@ -177,9 +177,7 @@ public class NotificationHookConsumerTest { ...@@ -177,9 +177,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
verify(notificationInterface).createConsumers(NotificationType.HOOK, 1); verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
...@@ -197,8 +195,7 @@ public class NotificationHookConsumerTest { ...@@ -197,8 +195,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
...@@ -217,7 +214,7 @@ public class NotificationHookConsumerTest { ...@@ -217,7 +214,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsActive(); notificationHookConsumer.instanceIsActive();
...@@ -237,8 +234,7 @@ public class NotificationHookConsumerTest { ...@@ -237,8 +234,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
doAnswer(new Answer() { doAnswer(new Answer() {
@Override @Override
...@@ -269,8 +265,7 @@ public class NotificationHookConsumerTest { ...@@ -269,8 +265,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive(); notificationHookConsumer.instanceIsPassive();
...@@ -335,7 +330,6 @@ public class NotificationHookConsumerTest { ...@@ -335,7 +330,6 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException()); when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
} }
} }
...@@ -23,7 +23,7 @@ import org.apache.atlas.AtlasException; ...@@ -23,7 +23,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.util.StatisticsUtil; import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.mockito.InOrder; import org.mockito.InOrder;
...@@ -53,7 +53,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -53,7 +53,7 @@ public class ActiveInstanceElectorServiceTest {
private ServiceState serviceState; private ServiceState serviceState;
@Mock @Mock
private StatisticsUtil statisticsUtil; private AtlasMetricsUtil metricsUtil;
@BeforeMethod @BeforeMethod
public void setup() { public void setup() {
...@@ -75,7 +75,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -75,7 +75,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
verify(leaderLatch).start(); verify(leaderLatch).start();
...@@ -96,7 +96,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -96,7 +96,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
verify(leaderLatch).addListener(activeInstanceElectorService); verify(leaderLatch).addListener(activeInstanceElectorService);
...@@ -108,7 +108,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -108,7 +108,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
verifyZeroInteractions(curatorFactory); verifyZeroInteractions(curatorFactory);
...@@ -129,7 +129,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -129,7 +129,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.stop(); activeInstanceElectorService.stop();
...@@ -151,7 +151,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -151,7 +151,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.stop(); activeInstanceElectorService.stop();
...@@ -165,7 +165,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -165,7 +165,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.stop(); activeInstanceElectorService.stop();
verifyZeroInteractions(curatorFactory); verifyZeroInteractions(curatorFactory);
...@@ -193,7 +193,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -193,7 +193,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -216,7 +216,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -216,7 +216,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -249,7 +249,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -249,7 +249,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -275,7 +275,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -275,7 +275,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -310,7 +310,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -310,7 +310,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
activeInstanceState, serviceState, statisticsUtil); activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.notLeader(); activeInstanceElectorService.notLeader();
...@@ -322,7 +322,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -322,7 +322,7 @@ public class ActiveInstanceElectorServiceTest {
public void testActiveStateSetOnBecomingLeader() { public void testActiveStateSetOnBecomingLeader() {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
curatorFactory, activeInstanceState, serviceState, statisticsUtil); curatorFactory, activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
...@@ -335,7 +335,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -335,7 +335,7 @@ public class ActiveInstanceElectorServiceTest {
public void testPassiveStateSetOnLoosingLeadership() { public void testPassiveStateSetOnLoosingLeadership() {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
curatorFactory, activeInstanceState, serviceState, statisticsUtil); curatorFactory, activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.notLeader(); activeInstanceElectorService.notLeader();
...@@ -362,7 +362,7 @@ public class ActiveInstanceElectorServiceTest { ...@@ -362,7 +362,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService = ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
curatorFactory, activeInstanceState, serviceState, statisticsUtil); curatorFactory, activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start(); activeInstanceElectorService.start();
activeInstanceElectorService.isLeader(); activeInstanceElectorService.isLeader();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment