Commit beb34506 by Madhan Neethiraj

ATLAS-3002: added instrumentation to collect time taken for sub-tasks during entity create/update

parent bd0c5a8a
......@@ -185,7 +185,7 @@ public class CreateHiveProcess extends BaseHiveEvent {
AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns));
columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectId(outputColumn)));
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
public class AtlasPerfMetrics {
private final Map<String, Metric> metrics = new LinkedHashMap<>();
public MetricRecorder getMetricRecorder(String name) {
return new MetricRecorder(name);
}
public void recordMetric(MetricRecorder recorder) {
if (recorder != null) {
final String name = recorder.name;
final long timeTaken = recorder.getElapsedTime();
Metric metric = metrics.get(name);
if (metric == null) {
metric = new Metric(name);
metrics.put(name, metric);
}
metric.invocations++;
metric.totalTimeMSecs += timeTaken;
}
}
public void clear() {
metrics.clear();
}
public boolean isEmpty() {
return metrics.isEmpty();
}
public Set<String> getMetricsNames() {
return metrics.keySet();
}
public Metric getMetric(String name) {
return metrics.get(name);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
if (!metrics.isEmpty()) {
for (Metric metric : metrics.values()) {
sb.append("\"").append(metric.getName()).append("\":{\"count\":").append(metric.getInvocations()).append(",\"timeTaken\":").append(metric.getTotalTimeMSecs()).append("},");
}
sb.setLength(sb.length() - 1); // remove last ","
}
sb.append("}");
return sb.toString();
}
public class MetricRecorder {
private final String name;
private final long startTimeMs = System.currentTimeMillis();
MetricRecorder(String name) {
this.name = name;
}
long getElapsedTime() {
return System.currentTimeMillis() - startTimeMs;
}
}
public static class Metric {
private final String name;
private short invocations = 0;
private long totalTimeMSecs = 0;
public Metric(String name) {
this.name = name;
}
public String getName() {
return name;
}
public short getInvocations() {
return invocations;
}
public long getTotalTimeMSecs() {
return totalTimeMSecs;
}
}
}
......@@ -37,6 +37,16 @@
</layout>
</appender>
<appender name="LARGE_MESSAGES" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="{{log_dir}}/large_messages.log"/>
<param name="Append" value="true"/>
<param name="MaxFileSize" value="100MB" />
<param name="MaxBackupIndex" value="20" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %m%n"/>
</layout>
</appender>
<appender name="AUDIT" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/audit.log"/>
<param name="Append" value="true"/>
......@@ -60,7 +70,7 @@
<param name="File" value="${atlas.log.dir}/failed.log"/>
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %m"/>
<param name="ConversionPattern" value="%d %m%n"/>
<param name="maxFileSize" value="100MB" />
<param name="maxBackupIndex" value="20" />
</layout>
......@@ -119,6 +129,11 @@
<appender-ref ref="AUDIT"/>
</logger>
<logger name="LARGE_MESSAGES" additivity="false">
<level value="warn"/>
<appender-ref ref="LARGE_MESSAGES"/>
</logger>
<logger name="METRICS" additivity="false">
<level value="debug"/>
<appender-ref ref="METRICS"/>
......@@ -126,7 +141,7 @@
<logger name="FAILED" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
<appender-ref ref="FAILED"/>
</logger>
<root>
......
......@@ -106,6 +106,9 @@ public class KafkaNotification extends AbstractNotification implements Service {
properties.put("enable.auto.commit", kafkaConf.getBoolean("enable.auto.commit", oldApiCommitEnableFlag));
properties.put("session.timeout.ms", kafkaConf.getString("session.timeout.ms", "30000"));
// if no value is specified for max.poll.records, set to 1
properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1));
LOG.info("<== KafkaNotification()");
}
......
......@@ -23,6 +23,7 @@ import org.aopalliance.intercept.MethodInvocation;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.exception.NotFoundException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
......@@ -69,6 +70,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
boolean isSuccess = false;
MetricRecorder metric = null;
try {
try {
......@@ -79,6 +81,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
LOG.debug("Ignoring commit for nested/inner transaction {}.{}", invokingClass, invokedMethodName);
}
} else {
metric = RequestContext.get().startMetricRecord("graphCommit");
doCommitOrRollback(invokingClass, invokedMethodName);
}
......@@ -97,6 +101,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
throw t;
}
} finally {
RequestContext.get().endMetricRecord(metric);
// Only outer txn can mark as closed
if (!isInnerTxn) {
if (LOG.isDebugEnabled()) {
......
......@@ -24,6 +24,7 @@ import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
import org.apache.atlas.RequestContext;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.type.AtlasEntityType;
......@@ -65,6 +66,8 @@ public class EntityAuditListener implements EntityChangeListener {
@Override
public void onEntitiesAdded(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_CREATE : EntityAuditAction.ENTITY_CREATE);
......@@ -72,10 +75,14 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
RequestContext.get().endMetricRecord(metric);
}
@Override
public void onEntitiesUpdated(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_UPDATE : EntityAuditAction.ENTITY_UPDATE);
......@@ -83,45 +90,61 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
RequestContext.get().endMetricRecord(metric);
}
@Override
public void onTraitsAdded(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
if (traits != null) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
for (Struct trait : traits) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD,
"Added trait: " + AtlasType.toV1Json(trait));
auditRepository.putEventsV1(event);
}
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onTraitsDeleted(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
if (traits != null) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
for (Struct trait : traits) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + trait.getTypeName());
auditRepository.putEventsV1(event);
}
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onTraitsUpdated(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException {
if (traits != null) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
for (Struct trait : traits) {
EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_UPDATE,
"Updated trait: " + AtlasType.toV1Json(trait));
auditRepository.putEventsV1(event);
}
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
EntityAuditEvent event = createEvent(entity, isImport ? EntityAuditAction.ENTITY_IMPORT_DELETE : EntityAuditAction.ENTITY_DELETE, "Deleted entity");
......@@ -129,10 +152,14 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
RequestContext.get().endMetricRecord(metric);
}
@Override
public void onTermAdded(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
......@@ -140,10 +167,14 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
RequestContext.get().endMetricRecord(metric);
}
@Override
public void onTermDeleted(Collection<Referenceable> entities, AtlasGlossaryTerm term) throws AtlasException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEvent> events = new ArrayList<>();
for (Referenceable entity : entities) {
......@@ -151,6 +182,8 @@ public class EntityAuditListener implements EntityChangeListener {
}
auditRepository.putEventsV1(events);
RequestContext.get().endMetricRecord(metric);
}
public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{
......
......@@ -26,13 +26,13 @@ import org.apache.atlas.listener.EntityChangeListenerV2;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -79,6 +79,8 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
......@@ -88,10 +90,14 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
@Override
public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
......@@ -101,10 +107,14 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
@Override
public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasEntity entity : entities) {
......@@ -114,11 +124,15 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
@Override
public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasClassification classification : classifications) {
......@@ -130,12 +144,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
String guid = entity.getGuid();
......@@ -154,12 +172,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onClassificationsDeleted(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasClassification classification : classifications) {
......@@ -171,12 +193,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasRelatedObjectId relatedObjectId : entities) {
......@@ -188,12 +214,16 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
List<EntityAuditEventV2> events = new ArrayList<>();
for (AtlasRelatedObjectId relatedObjectId : entities) {
......@@ -205,6 +235,8 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
auditRepository.putEventsV2(events);
RequestContext.get().endMetricRecord(metric);
}
}
......
......@@ -26,6 +26,13 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
......@@ -37,6 +44,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
......@@ -52,15 +60,18 @@ public class FullTextMapperV2 {
private static final String FULL_TEXT_FOLLOW_REFERENCES = "atlas.search.fulltext.followReferences";
private static final String FULL_TEXT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.search.fulltext.type";
private final EntityGraphRetriever entityGraphRetriever;
private final AtlasTypeRegistry typeRegistry;
private final Configuration configuration;
private final EntityGraphRetriever entityGraphRetriever;
private final boolean followReferences;
private final Map<String, Set<String>> excludeAttributesCache = new HashMap<>();
@Inject
public FullTextMapperV2(AtlasTypeRegistry typeRegistry, Configuration configuration) {
this.typeRegistry = typeRegistry;
this.configuration = configuration;
followReferences = this.configuration != null && this.configuration.getBoolean(FULL_TEXT_FOLLOW_REFERENCES, false);
// If followReferences = false then ignore relationship attr loading
entityGraphRetriever = new EntityGraphRetriever(typeRegistry, !followReferences);
......@@ -90,11 +101,12 @@ public class FullTextMapperV2 {
if (CollectionUtils.isNotEmpty(classifications)) {
for (AtlasClassification classification : classifications) {
sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
final AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
final Set<String> excludeAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
Set<String> excludeAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
mapAttributes(classification.getAttributes(), entityWithExtInfo, sb, new HashSet<String>(), excludeAttributes);
mapAttributes(classificationType, classification.getAttributes(), entityWithExtInfo, sb, new HashSet<String>(), excludeAttributes);
}
}
......@@ -109,13 +121,24 @@ public class FullTextMapperV2 {
}
public String getIndexTextForEntity(String guid) throws AtlasBaseException {
String ret = null;
AtlasEntity entity = getAndCacheEntity(guid);
String ret = null;
final AtlasEntity entity;
final AtlasEntityExtInfo entityExtInfo;
if (followReferences) {
AtlasEntityWithExtInfo entityWithExtInfo = getAndCacheEntityWithExtInfo(guid);
entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
entityExtInfo = entityWithExtInfo;
} else {
entity = getAndCacheEntity(guid);
entityExtInfo = null;
}
if (entity != null) {
StringBuilder sb = new StringBuilder();
map(entity, null, sb, new HashSet<String>());
map(entity, entityExtInfo, sb, new HashSet<String>());
ret = sb.toString();
}
......@@ -132,27 +155,30 @@ public class FullTextMapperV2 {
return;
}
final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
final Set<String> excludeAttributes = getExcludeAttributesForIndexText(entity.getTypeName());
processedGuids.add(entity.getGuid());
sb.append(entity.getTypeName()).append(FULL_TEXT_DELIMITER);
Set<String> excludeAttributes = getExcludeAttributesForIndexText(entity.getTypeName());
mapAttributes(entityType, entity.getAttributes(), entityExtInfo, sb, processedGuids, excludeAttributes);
mapAttributes(entity.getAttributes(), entityExtInfo, sb, processedGuids, excludeAttributes);
List<AtlasClassification> classifications = entity.getClassifications();
final List<AtlasClassification> classifications = entity.getClassifications();
if (CollectionUtils.isNotEmpty(classifications)) {
for (AtlasClassification classification : classifications) {
sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
final AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
final Set<String> excludeClassificationAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
Set<String> excludeClassificationAttributes = getExcludeAttributesForIndexText(classification.getTypeName());
mapAttributes(classification.getAttributes(), entityExtInfo, sb, processedGuids, excludeClassificationAttributes);
sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER);
mapAttributes(classificationType, classification.getAttributes(), entityExtInfo, sb, processedGuids, excludeClassificationAttributes);
}
}
}
private void mapAttributes(Map<String, Object> attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb,
private void mapAttributes(AtlasStructType structType, Map<String, Object> attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb,
Set<String> processedGuids, Set<String> excludeAttributes) throws AtlasBaseException {
if (MapUtils.isEmpty(attributes)) {
return;
......@@ -162,10 +188,28 @@ public class FullTextMapperV2 {
String attribKey = attributeEntry.getKey();
Object attrValue = attributeEntry.getValue();
if (attrValue == null || isExcludedAttribute(excludeAttributes, attribKey)) {
if (attrValue == null || excludeAttributes.contains(attribKey)) {
continue;
}
if (!followReferences) {
AtlasAttribute attribute = structType != null ? structType.getAttribute(attribKey) : null;
AtlasType attributeType = attribute != null ? attribute.getAttributeType() : null;
if (attributeType == null) {
continue;
}
if (attributeType instanceof AtlasArrayType) {
attributeType = ((AtlasArrayType) attributeType).getElementType();
}
if (attributeType instanceof AtlasEntityType || attributeType instanceof AtlasBuiltInTypes.AtlasObjectIdType) {
continue;
}
}
sb.append(attribKey).append(FULL_TEXT_DELIMITER);
mapAttribute(attrValue, entityExtInfo, sb, processedGuids);
......@@ -249,12 +293,8 @@ public class FullTextMapperV2 {
return entityWithExtInfo;
}
private boolean isExcludedAttribute(Set<String> excludeAttributes, String attributeName) {
return CollectionUtils.isNotEmpty(excludeAttributes) && excludeAttributes.contains(attributeName);
}
private Set<String> getExcludeAttributesForIndexText(String typeName) {
Set<String> ret = null;
final Set<String> ret;
if (excludeAttributesCache.containsKey(typeName)) {
ret = excludeAttributesCache.get(typeName);
......@@ -265,9 +305,13 @@ public class FullTextMapperV2 {
if (ArrayUtils.isNotEmpty(excludeAttributes)) {
ret = new HashSet<>(Arrays.asList(excludeAttributes));
} else {
ret = Collections.emptySet();
}
excludeAttributesCache.put(typeName, ret);
} else {
ret = Collections.emptySet();
}
return ret;
......
......@@ -29,13 +29,13 @@ import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
......@@ -437,6 +437,8 @@ public class AtlasEntityChangeNotifier {
LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping");
}
MetricRecorder metric = RequestContext.get().startMetricRecord("fullTextMapping");
for (AtlasEntityHeader entityHeader : entityHeaders) {
if(GraphHelper.isInternalType(entityHeader.getTypeName())) {
continue;
......@@ -457,6 +459,8 @@ public class AtlasEntityChangeNotifier {
LOG.error("FullText mapping failed for Vertex[ guid = {} ]", guid, e);
}
}
RequestContext.get().endMetricRecord(metric);
}
private void updateFullTextMapping(String entityId, List<AtlasClassification> classifications) {
......@@ -477,6 +481,8 @@ public class AtlasEntityChangeNotifier {
return;
}
MetricRecorder metric = RequestContext.get().startMetricRecord("fullTextMapping");
try {
String classificationFullText = fullTextMapperV2.getIndexTextForClassifications(entityId, classifications);
String existingFullText = AtlasGraphUtilsV2.getEncodedProperty(atlasVertex, ENTITY_TEXT_PROPERTY_KEY, String.class);
......@@ -486,6 +492,8 @@ public class AtlasEntityChangeNotifier {
} catch (AtlasBaseException e) {
LOG.error("FullText mapping failed for Vertex[ guid = {} ]", entityId, e);
}
RequestContext.get().endMetricRecord(metric);
}
private void doFullTextMapping(String guid) {
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -35,6 +36,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -127,6 +129,8 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
protected void discover() throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("walkEntityGraph");
EntityStream entityStream = discoveryContext.getEntityStream();
Set<String> walkedEntities = new HashSet<>();
......@@ -162,9 +166,13 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
walkedEntities.add(entity.getGuid());
}
}
RequestContext.get().endMetricRecord(metric);
}
protected void resolveReferences() throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("resolveReferences");
EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry),
new UniqAttrBasedEntityResolver(typeRegistry)
};
......@@ -172,6 +180,8 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
for (EntityResolver resolver : entityResolvers) {
resolver.resolveEntityReferences(discoveryContext);
}
RequestContext.get().endMetricRecord(metric);
}
private void visitReference(AtlasObjectIdType type, Object val) throws AtlasBaseException {
......
......@@ -41,6 +41,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
......@@ -671,6 +672,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()");
}
MetricRecorder metric = RequestContext.get().startMetricRecord("createOrUpdate");
try {
final EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate);
......@@ -730,11 +733,15 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
return ret;
} finally {
RequestContext.get().endMetricRecord(metric);
AtlasPerfTracer.log(perf);
}
}
private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("preCreateOrUpdate");
EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream);
EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
EntityMutationContext context = new EntityMutationContext(discoveryContext);
......@@ -797,6 +804,8 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
}
RequestContext.get().endMetricRecord(metric);
return context;
}
......
......@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.discovery.SearchProcessor;
import org.apache.atlas.exception.AtlasBaseException;
......@@ -38,6 +39,7 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
......@@ -268,6 +270,8 @@ public class AtlasGraphUtilsV2 {
}
public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findByUniqueAttributes");
AtlasVertex vertex = null;
final Map<String, AtlasAttribute> uniqueAttributes = entityType.getUniqAttributes();
......@@ -302,6 +306,8 @@ public class AtlasGraphUtilsV2 {
}
}
RequestContext.get().endMetricRecord(metric);
return vertex;
}
......@@ -345,6 +351,8 @@ public class AtlasGraphUtilsV2 {
}
public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findByTypeAndPropertyName");
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
.has(propertyName, attrVal)
......@@ -354,10 +362,14 @@ public class AtlasGraphUtilsV2 {
AtlasVertex vertex = results.hasNext() ? results.next() : null;
RequestContext.get().endMetricRecord(metric);
return vertex;
}
public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findBySuperTypeAndPropertyName");
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
.has(propertyName, attrVal)
......@@ -367,6 +379,8 @@ public class AtlasGraphUtilsV2 {
AtlasVertex vertex = results.hasNext() ? results.next() : null;
RequestContext.get().endMetricRecord(metric);
return vertex;
}
......
......@@ -55,6 +55,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
......@@ -175,6 +176,8 @@ public class EntityGraphMapper {
}
public EntityMutationResponse mapAttributesAndClassifications(EntityMutationContext context, final boolean isPartialUpdate, final boolean replaceClassifications) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("mapAttributesAndClassifications");
EntityMutationResponse resp = new EntityMutationResponse();
Collection<AtlasEntity> createdEntities = context.getCreatedEntities();
......@@ -237,6 +240,8 @@ public class EntityGraphMapper {
}
}
RequestContext.get().endMetricRecord(metric);
return resp;
}
......@@ -284,6 +289,8 @@ public class EntityGraphMapper {
}
if (MapUtils.isNotEmpty(struct.getAttributes())) {
MetricRecorder metric = RequestContext.get().startMetricRecord("mapAttributes");
AtlasStructType structType = getStructType(struct.getTypeName());
if (op.equals(CREATE)) {
......@@ -308,6 +315,8 @@ public class EntityGraphMapper {
}
updateModificationMetadata(vertex);
RequestContext.get().endMetricRecord(metric);
}
if (LOG.isDebugEnabled()) {
......@@ -322,6 +331,8 @@ public class EntityGraphMapper {
}
if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
MetricRecorder metric = RequestContext.get().startMetricRecord("mapRelationshipAttributes");
AtlasEntityType entityType = getEntityType(entity.getTypeName());
if (op.equals(CREATE)) {
......@@ -343,6 +354,8 @@ public class EntityGraphMapper {
}
updateModificationMetadata(vertex);
RequestContext.get().endMetricRecord(metric);
}
if (LOG.isDebugEnabled()) {
......
......@@ -23,6 +23,8 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.store.DeleteType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -30,10 +32,11 @@ import org.slf4j.LoggerFactory;
import java.util.*;
public class RequestContext {
private static final Logger LOG = LoggerFactory.getLogger(RequestContext.class);
private static final Logger METRICS = LoggerFactory.getLogger("METRICS");
private static final ThreadLocal<RequestContext> CURRENT_CONTEXT = new ThreadLocal<>();
private static final Set<RequestContext> ACTIVE_REQUESTS = new HashSet<>();
private static final boolean isMetricsEnabled = METRICS.isDebugEnabled();
private final long requestTime = System.currentTimeMillis();
private final Map<String, AtlasObjectId> updatedEntities = new HashMap<>();
......@@ -42,6 +45,7 @@ public class RequestContext {
private final Map<String, AtlasEntityWithExtInfo> entityExtInfoCache = new HashMap<>();
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
private final Map<String, List<AtlasClassification>> removedPropagations = new HashMap<>();
private final AtlasPerfMetrics metrics = isMetricsEnabled ? new AtlasPerfMetrics() : null;
private List<EntityGuidPair> entityGuidInRequest = null;
private String user;
......@@ -95,6 +99,12 @@ public class RequestContext {
this.addedPropagations.clear();
this.removedPropagations.clear();
if (metrics != null && !metrics.isEmpty()) {
METRICS.debug(metrics.toString());
metrics.clear();
}
if (this.entityGuidInRequest != null) {
this.entityGuidInRequest.clear();
}
......@@ -273,6 +283,16 @@ public class RequestContext {
return deletedEntities.containsKey(guid);
}
public MetricRecorder startMetricRecord(String name) { return metrics != null ? metrics.getMetricRecorder(name) : null; }
public void endMetricRecord(MetricRecorder recorder) {
if (metrics != null && recorder != null) {
metrics.recordMetric(recorder);
}
}
public void recordEntityGuidUpdate(AtlasEntity entity, String guidInRequest) {
if (entityGuidInRequest == null) {
entityGuidInRequest = new ArrayList<>();
......
......@@ -32,6 +32,7 @@ import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
......@@ -118,6 +119,8 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
}
private void notifyEntityEvents(List<AtlasEntity> entities, OperationType operationType) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification");
List<EntityNotificationV2> messages = new ArrayList<>();
for (AtlasEntity entity : entities) {
......@@ -135,6 +138,8 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, operationType.name());
}
}
RequestContext.get().endMetricRecord(metric);
}
private AtlasEntityHeader toNotificationHeader(AtlasEntity entity) {
......
......@@ -19,8 +19,10 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.v1.model.notification.EntityNotificationV1;
......@@ -159,6 +161,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
// send notification of entity change
private void notifyOfEntityEvent(Collection<Referenceable> entityDefinitions,
OperationType operationType) throws AtlasException {
MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification");
List<EntityNotificationV1> messages = new ArrayList<>();
for (Referenceable entityDefinition : entityDefinitions) {
......@@ -186,6 +190,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
if (!messages.isEmpty()) {
notificationSender.send(messages);
}
RequestContext.get().endMetricRecord(metric);
}
private List<String> getNotificationAttributes(String entityType) {
......
......@@ -88,6 +88,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
private static final Logger LARGE_MESSAGES_LOG = LoggerFactory.getLogger("LARGE_MESSAGES");
private static final int SC_OK = 200;
private static final int SC_BAD_REQUEST = 400;
......@@ -121,6 +122,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final int maxWaitDuration;
private final boolean skipHiveColumnLineageHive20633;
private final int skipHiveColumnLineageHive20633InputsThreshold;
private final int largeMessageProcessingTimeThresholdMs;
private final boolean consumerDisabled;
private NotificationInterface notificationInterface;
......@@ -153,6 +155,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
......@@ -603,6 +606,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} finally {
AtlasPerfTracer.log(perf);
long msgProcessingTime = perf != null ? perf.getElapsedTime() : 0;
if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) {
String strMessage = AbstractNotification.getMessageJson(message);
LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset());
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage);
}
if (auditLog != null) {
auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK);
auditLog.setTimeTaken(System.currentTimeMillis() - startTime);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment