Commit faeecf10 by Madhan Neethiraj

ATLAS-2316: when Hive table is created Atlas audit shows ENTITY_UPDATE instead of ENTITY_CREATE

parent ac0764be
......@@ -191,7 +191,15 @@ public class EntityMutationResponse {
return getFirstEntityByType(getEntitiesByOperation(EntityOperation.UPDATE), typeName);
}
@JsonIgnore
public void addEntity(EntityOperation op, AtlasEntityHeader header) {
// if an entity is already included in CREATE, ignore subsequent UPDATE, PARTIAL_UPDATE
if (op == EntityOperation.UPDATE || op == EntityOperation.PARTIAL_UPDATE) {
if (entityHeaderExists(getCreatedEntities(), header)) {
return;
}
}
if (mutatedEntities == null) {
mutatedEntities = new HashMap<>();
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.utils;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
public class AtlasEntityUtil {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityUtil.class);
public static boolean hasAnyAttributeUpdate(AtlasEntityType entityType, AtlasEntity currEntity, AtlasEntity newEntity) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> hasAnyAttributeUpdate(guid={}, typeName={})", currEntity.getGuid(), currEntity.getTypeName());
}
boolean ret = false;
for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
String attrName = attribute.getName();
AtlasType attrType = attribute.getAttributeType();
Object currValue = attrType.getNormalizedValue(currEntity.getAttribute(attrName));
Object newValue = attrType.getNormalizedValue(newEntity.getAttribute(attrName));
if (!Objects.equals(currValue, newValue)) {
ret = true;
// for map/list types, treat 'null' same as empty
if ((currValue == null && newValue != null) || (currValue != null && newValue == null)) {
if (attrType instanceof AtlasMapType) {
if (MapUtils.isEmpty((Map) currValue) && MapUtils.isEmpty((Map) newValue)) {
ret = false;
}
} else if (attrType instanceof AtlasArrayType) {
if (CollectionUtils.isEmpty((Collection) currValue) && CollectionUtils.isEmpty((Collection) newValue)) {
ret = false;
}
}
}
if (ret) {
if (LOG.isDebugEnabled()) {
LOG.debug("hasAnyAttributeUpdate(guid={}, typeName={}): attribute '{}' is found updated - currentValue={}, newValue={}",
currEntity.getGuid(), currEntity.getTypeName(), attrName, currValue, newValue);
}
break;
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== hasAnyAttributeUpdate(guid={}, typeName={}): ret={}", currEntity.getGuid(), currEntity.getTypeName(), ret);
}
return ret;
}
}
......@@ -39,6 +39,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -47,11 +48,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
......@@ -173,6 +170,34 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
// Create/Update entities
EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate);
// for existing entities, skip update if incoming entity doesn't have any change
if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) {
EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
List<AtlasEntity> entitiesToSkipUpdate = null;
for (AtlasEntity entity : context.getUpdatedEntities()) {
String guid = entity.getGuid();
AtlasVertex vertex = context.getVertex(guid);
AtlasEntity entityInStore = entityRetriever.toAtlasEntity(vertex);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
if (!AtlasEntityUtil.hasAnyAttributeUpdate(entityType, entity, entityInStore)) {
// if classifications are to be replaced as well, then skip updates only when no change in classifications as well
if (!replaceClassifications || Objects.equals(entity.getClassifications(), entityInStore.getClassifications())) {
if (entitiesToSkipUpdate == null) {
entitiesToSkipUpdate = new ArrayList<>();
}
entitiesToSkipUpdate.add(entity);
}
}
}
if (entitiesToSkipUpdate != null) {
context.getUpdatedEntities().removeAll(entitiesToSkipUpdate);
}
}
EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications);
ret.setGuidAssignments(context.getGuidAssignments());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment