Commit b24f88ec by Sarath Subramanian

ATLAS-3405: Handling of references to non-existing entities in notifications

parent ec7b8245
......@@ -135,6 +135,8 @@ public final class Constants {
public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
/**
* search backing index name.
*/
......@@ -192,9 +194,10 @@ public final class Constants {
* replication attributes
*/
public static final String ATTR_NAME_REFERENCEABLE = "Referenceable.";
public static final String ATTR_NAME_REPLICATED_TO = "replicatedTo";
public static final String ATTR_NAME_REPLICATED_FROM = "replicatedFrom";
public static final String ATTR_NAME_REFERENCEABLE = "Referenceable.";
public static final String ATTR_NAME_REPLICATED_TO = "replicatedTo";
public static final String ATTR_NAME_REPLICATED_FROM = "replicatedFrom";
public static final Integer INCOMPLETE_ENTITY_VALUE = Integer.valueOf(1);
private Constants() {
}
......
......@@ -48,6 +48,9 @@ public enum AtlasConfiguration {
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60),
NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.notification.consumer.create.shell.entity.for.non-existing.ref", true),
REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.rest.create.shell.entity.for.non-existing.ref", false),
GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH("atlas.graphstore.indexed.string.safe.length", Short.MAX_VALUE), // based on org.apache.hadoop.hbase.client.Mutation.checkRow()
//search configuration
......
......@@ -59,16 +59,17 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
public class AtlasEntity extends AtlasStruct implements Serializable {
private static final long serialVersionUID = 1L;
public static final String KEY_GUID = "guid";
public static final String KEY_HOME_ID = "homeId";
public static final String KEY_IS_PROXY = "isProxy";
public static final String KEY_PROVENANCE_TYPE = "provenanceType";
public static final String KEY_STATUS = "status";
public static final String KEY_CREATED_BY = "createdBy";
public static final String KEY_UPDATED_BY = "updatedBy";
public static final String KEY_CREATE_TIME = "createTime";
public static final String KEY_UPDATE_TIME = "updateTime";
public static final String KEY_VERSION = "version";
public static final String KEY_GUID = "guid";
public static final String KEY_HOME_ID = "homeId";
public static final String KEY_IS_PROXY = "isProxy";
public static final String KEY_IS_INCOMPLETE = "isIncomplete";
public static final String KEY_PROVENANCE_TYPE = "provenanceType";
public static final String KEY_STATUS = "status";
public static final String KEY_CREATED_BY = "createdBy";
public static final String KEY_UPDATED_BY = "updatedBy";
public static final String KEY_CREATE_TIME = "createTime";
public static final String KEY_UPDATE_TIME = "updateTime";
public static final String KEY_VERSION = "version";
/**
* Status of the entity - can be active or deleted. Deleted entities are not removed from Atlas store.
......@@ -78,6 +79,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
private String guid = null;
private String homeId = null;
private Boolean isProxy = Boolean.FALSE;
private Boolean isIncomplete = Boolean.FALSE;
private Integer provenanceType = 0;
private Status status = Status.ACTIVE;
private String createdBy = null;
......@@ -133,6 +135,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
Object oGuid = map.get(KEY_GUID);
Object homeId = map.get(KEY_HOME_ID);
Object isProxy = map.get(KEY_IS_PROXY);
Object isIncomplete = map.get(KEY_IS_INCOMPLETE);
Object provenanceType = map.get(KEY_PROVENANCE_TYPE);
Object status = map.get(KEY_STATUS);
Object createdBy = map.get(KEY_CREATED_BY);
......@@ -156,6 +159,12 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
setIsProxy(Boolean.FALSE);
}
if (isIncomplete != null) {
setIsIncomplete((Boolean) isIncomplete);
} else {
setIsIncomplete(Boolean.FALSE);
}
if (provenanceType instanceof Number) {
setProvenanceType(((Number) version).intValue());
}
......@@ -193,6 +202,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
setGuid(other.getGuid());
setHomeId(other.getHomeId());
setIsProxy(other.isProxy());
setIsIncomplete(other.getIsIncomplete());
setProvenanceType(other.getProvenanceType());
setStatus(other.getStatus());
setCreatedBy(other.getCreatedBy());
......@@ -230,6 +240,14 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
this.isProxy = isProxy;
}
public Boolean getIsIncomplete() {
return isIncomplete;
}
public void setIsIncomplete(Boolean isIncomplete) {
this.isIncomplete = isIncomplete;
}
public Integer getProvenanceType() {
return provenanceType;
}
......@@ -355,6 +373,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
setGuid(nextInternalId());
setHomeId(null);
setIsProxy(Boolean.FALSE);
setIsIncomplete(Boolean.FALSE);
setProvenanceType(0);
setStatus(null);
setCreatedBy(null);
......@@ -380,6 +399,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
sb.append("guid='").append(guid).append('\'');
sb.append(", homeId='").append(homeId).append('\'');
sb.append(", isProxy='").append(isProxy).append('\'');
sb.append(", isIncomplete=").append(isIncomplete);
sb.append(", provenanceType=").append(provenanceType);
sb.append(", status=").append(status);
sb.append(", createdBy='").append(createdBy).append('\'');
......@@ -411,6 +431,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
return Objects.equals(guid, that.guid) &&
Objects.equals(homeId, that.homeId) &&
Objects.equals(isProxy, that.isProxy) &&
Objects.equals(isIncomplete, that.isIncomplete) &&
Objects.equals(provenanceType, that.provenanceType) &&
status == that.status &&
Objects.equals(createdBy, that.createdBy) &&
......@@ -424,8 +445,8 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), guid, homeId, isProxy, provenanceType, status, createdBy, updatedBy, createTime, updateTime, version,
relationshipAttributes, classifications);
return Objects.hash(super.hashCode(), guid, homeId, isProxy, isIncomplete, provenanceType, status,
createdBy, updatedBy, createTime, updateTime, version, relationshipAttributes, classifications);
}
@Override
......
......@@ -60,6 +60,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
private List<AtlasClassification> classifications = null;
private List<String> meaningNames = null;
private List<AtlasTermAssignmentHeader> meanings = null;
private Boolean isIncomplete = Boolean.FALSE;
public AtlasEntityHeader() {
this(null, null);
......@@ -98,6 +99,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
setDisplayText(other.getDisplayText());
setClassificationNames(other.getClassificationNames());
setClassifications(other.getClassifications());
setIsIncomplete(other.getIsIncomplete());
}
}
......@@ -106,6 +108,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
setGuid(entity.getGuid());
setStatus(entity.getStatus());
setClassifications(entity.getClassifications());
setIsIncomplete(entity.getIsIncomplete());
if (CollectionUtils.isNotEmpty(entity.getClassifications())) {
this.classificationNames = new ArrayList<>(entity.getClassifications().size());
......@@ -156,6 +159,14 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
this.classifications = classifications;
}
public Boolean getIsIncomplete() {
return isIncomplete;
}
public void setIsIncomplete(Boolean isIncomplete) {
this.isIncomplete = isIncomplete;
}
@Override
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
......@@ -172,6 +183,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
sb.append("classifications=[");
AtlasBaseTypeDef.dumpObjects(classifications, sb);
sb.append("], ");
sb.append("isIncomplete=").append(isIncomplete);
super.toString(sb);
sb.append('}');
......@@ -190,12 +202,13 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
Objects.equals(classificationNames, that.classificationNames) &&
Objects.equals(meaningNames, that.classificationNames) &&
Objects.equals(classifications, that.classifications) &&
Objects.equals(isIncomplete, that.isIncomplete) &&
Objects.equals(meanings, that.meanings);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), guid, status, displayText, classificationNames, classifications, meaningNames, meanings);
return Objects.hash(super.hashCode(), guid, status, displayText, classificationNames, classifications, meaningNames, meanings, isIncomplete);
}
@Override
......
......@@ -323,6 +323,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createCommonVertexIndex(management, PROPAGATED_CLASSIFICATION_NAMES_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, true);
createCommonVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, LIST, true, true);
createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, UniqueKind.NONE, Integer.class, SINGLE, true, true);
createCommonVertexIndex(management, PATCH_ID_PROPERTY_KEY, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_DESCRIPTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
......
......@@ -78,28 +78,8 @@ import java.util.UUID;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_NAME_DELIMITER;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_NAME_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY;
import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL;
import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
......@@ -1078,6 +1058,13 @@ public final class GraphHelper {
return element.getProperty(Constants.IS_PROXY_KEY, Boolean.class);
}
public static Boolean isEntityIncomplete(AtlasElement element) {
Integer value = element.getProperty(Constants.IS_INCOMPLETE_PROPERTY_KEY, Integer.class);
Boolean ret = (value != null && value == INCOMPLETE_ENTITY_VALUE) ? Boolean.TRUE : Boolean.FALSE;
return ret;
}
public static Integer getProvenanceType(AtlasElement element) {
return element.getProperty(Constants.PROVENANCE_TYPE_KEY, Integer.class);
}
......
......@@ -56,10 +56,12 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphDiscoveryContext discoveryContext;
private final EntityGraphMapper entityGraphMapper;
public AtlasEntityGraphDiscoveryV2(AtlasTypeRegistry typeRegistry, EntityStream entityStream) {
this.typeRegistry = typeRegistry;
this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
public AtlasEntityGraphDiscoveryV2(AtlasTypeRegistry typeRegistry, EntityStream entityStream, EntityGraphMapper entityGraphMapper) {
this.typeRegistry = typeRegistry;
this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
this.entityGraphMapper = entityGraphMapper;
}
@Override
......@@ -177,8 +179,8 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
protected void resolveReferences() throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("resolveReferences");
EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry),
new UniqAttrBasedEntityResolver(typeRegistry)
EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry, entityGraphMapper),
new UniqAttrBasedEntityResolver(typeRegistry, entityGraphMapper)
};
for (EntityResolver resolver : entityResolvers) {
......
......@@ -55,8 +55,11 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.*;
import static java.lang.Boolean.FALSE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
@Component
......@@ -881,7 +884,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("preCreateOrUpdate");
EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream);
EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream, entityGraphMapper);
EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
EntityMutationContext context = new EntityMutationContext(discoveryContext);
RequestContext requestContext = RequestContext.get();
......@@ -903,6 +906,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
if (vertex != null) {
if (!isPartialUpdate) {
graphDiscoverer.validateAndNormalize(entity);
// change entity 'isInComplete' to 'false' during full update
if (isEntityIncomplete(vertex)) {
vertex.removeProperty(IS_INCOMPLETE_PROPERTY_KEY);
entity.setIsIncomplete(FALSE);
}
} else {
graphDiscoverer.validateAndNormalizeForUpdate(entity);
}
......
......@@ -45,6 +45,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes;
......@@ -140,14 +141,49 @@ public class EntityGraphMapper {
return createVertexWithGuid(entity, guid);
}
public AtlasVertex createShellEntityVertex(AtlasObjectId objectId, EntityGraphDiscoveryContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createShellEntityVertex({})", objectId.getTypeName());
}
final String guid = UUID.randomUUID().toString();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName());
AtlasVertex ret = createStructVertex(objectId);
for (String superTypeName : entityType.getAllSuperTypes()) {
AtlasGraphUtilsV2.addEncodedProperty(ret, SUPER_TYPES_PROPERTY_KEY, superTypeName);
}
AtlasGraphUtilsV2.setEncodedProperty(ret, GUID_PROPERTY_KEY, guid);
AtlasGraphUtilsV2.setEncodedProperty(ret, VERSION_PROPERTY_KEY, getEntityVersion(null));
AtlasGraphUtilsV2.setEncodedProperty(ret, IS_INCOMPLETE_PROPERTY_KEY, INCOMPLETE_ENTITY_VALUE);
// map unique attributes
Map<String, Object> uniqueAttributes = objectId.getUniqueAttributes();
EntityMutationContext mutationContext = new EntityMutationContext(context);
for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
String attrName = attribute.getName();
if (uniqueAttributes.containsKey(attrName)) {
Object attrValue = attribute.getAttributeType().getNormalizedValue(uniqueAttributes.get(attrName));
mapAttribute(attribute, attrValue, ret, CREATE, mutationContext);
}
}
GraphTransactionInterceptor.addToVertexCache(guid, ret);
return ret;
}
public AtlasVertex createVertexWithGuid(AtlasEntity entity, String guid) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createVertex({})", entity.getTypeName());
LOG.debug("==> createVertexWithGuid({})", entity.getTypeName());
}
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
AtlasVertex ret = createStructVertex(entity);
AtlasVertex ret = createStructVertex(entity);
for (String superTypeName : entityType.getAllSuperTypes()) {
AtlasGraphUtilsV2.addEncodedProperty(ret, SUPER_TYPES_PROPERTY_KEY, superTypeName);
......@@ -271,13 +307,21 @@ public class EntityGraphMapper {
}
private AtlasVertex createStructVertex(AtlasStruct struct) {
return createStructVertex(struct.getTypeName());
}
private AtlasVertex createStructVertex(AtlasObjectId objectId) {
return createStructVertex(objectId.getTypeName());
}
private AtlasVertex createStructVertex(String typeName) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createStructVertex({})", struct.getTypeName());
LOG.debug("==> createStructVertex({})", typeName);
}
final AtlasVertex ret = graph.addVertex();
AtlasGraphUtilsV2.setEncodedProperty(ret, ENTITY_TYPE_PROPERTY_KEY, struct.getTypeName());
AtlasGraphUtilsV2.setEncodedProperty(ret, ENTITY_TYPE_PROPERTY_KEY, typeName);
AtlasGraphUtilsV2.setEncodedProperty(ret, STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
AtlasGraphUtilsV2.setEncodedProperty(ret, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
AtlasGraphUtilsV2.setEncodedProperty(ret, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
......@@ -285,7 +329,7 @@ public class EntityGraphMapper {
AtlasGraphUtilsV2.setEncodedProperty(ret, MODIFIED_BY_KEY, RequestContext.get().getUser());
if (LOG.isDebugEnabled()) {
LOG.debug("<== createStructVertex({})", struct.getTypeName());
LOG.debug("<== createStructVertex({})", typeName);
}
return ret;
......@@ -1437,6 +1481,7 @@ public class EntityGraphMapper {
header.setGuid(getIdFromVertex(vertex));
header.setStatus(entity.getStatus());
header.setIsIncomplete(entity.getIsIncomplete());
for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
header.setAttribute(attribute.getName(), entity.getAttribute(attribute.getName()));
......
......@@ -101,23 +101,7 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL;
import static org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges;
import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
import static org.apache.atlas.repository.graph.GraphHelper.getArrayElementsProperty;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityStatus;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPrimitiveMap;
import static org.apache.atlas.repository.graph.GraphHelper.getReferenceMap;
import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getRemovePropagations;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
......@@ -508,13 +492,15 @@ public class EntityGraphRetriever {
private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set<String> attributes) throws AtlasBaseException {
AtlasEntityHeader ret = new AtlasEntityHeader();
String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
String guid = entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
String guid = entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
Boolean isIncomplete = isEntityIncomplete(entityVertex);
ret.setTypeName(typeName);
ret.setGuid(guid);
ret.setStatus(GraphHelper.getStatus(entityVertex));
ret.setClassificationNames(getAllTraitNames(entityVertex));
ret.setIsIncomplete(isIncomplete);
List<AtlasTermAssignmentHeader> termAssignmentHeaders = mapAssignedTerms(entityVertex);
ret.setMeanings(termAssignmentHeaders);
......@@ -593,6 +579,7 @@ public class EntityGraphRetriever {
entity.setHomeId(GraphHelper.getHomeId(entityVertex));
entity.setIsProxy(GraphHelper.isProxy(entityVertex));
entity.setIsIncomplete(isEntityIncomplete(entityVertex));
entity.setProvenanceType(GraphHelper.getProvenanceType(entityVertex));
......
......@@ -36,9 +36,11 @@ public class IDBasedEntityResolver implements EntityResolver {
private static final Logger LOG = LoggerFactory.getLogger(IDBasedEntityResolver.class);
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphMapper entityGraphMapper;
public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry, EntityGraphMapper entityGraphMapper) {
this.typeRegistry = typeRegistry;
this.entityGraphMapper = entityGraphMapper;
}
public EntityGraphDiscoveryContext resolveEntityReferences(EntityGraphDiscoveryContext context) throws AtlasBaseException {
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasObjectId;
......@@ -32,14 +33,15 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class UniqAttrBasedEntityResolver implements EntityResolver {
private static final Logger LOG = LoggerFactory.getLogger(UniqAttrBasedEntityResolver.class);
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphMapper entityGraphMapper;
public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry, EntityGraphMapper entityGraphMapper) {
this.typeRegistry = typeRegistry;
this.entityGraphMapper = entityGraphMapper;
}
@Override
......@@ -61,6 +63,10 @@ public class UniqAttrBasedEntityResolver implements EntityResolver {
AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, objId.getUniqueAttributes());
if (vertex == null && RequestContext.get().isCreateShellEntityForNonExistingReference()) {
vertex = entityGraphMapper.createShellEntityVertex(objId, context);
}
if (vertex != null) {
context.addResolvedIdByUniqAttribs(objId, vertex);
resolvedReferences.add(objId);
......@@ -71,5 +77,4 @@ public class UniqAttrBasedEntityResolver implements EntityResolver {
return context;
}
}
}
\ No newline at end of file
......@@ -57,7 +57,7 @@ public class RequestContext {
private boolean isImportInProgress = false;
private boolean isInNotificationProcessing = false;
private boolean isInTypePatching = false;
private boolean createShellEntityForNonExistingReference = false;
private RequestContext() {
}
......@@ -182,6 +182,13 @@ public class RequestContext {
isInTypePatching = inTypePatching;
}
public boolean isCreateShellEntityForNonExistingReference() {
return createShellEntityForNonExistingReference;
}
public void setCreateShellEntityForNonExistingReference(boolean createShellEntityForNonExistingReference) {
this.createShellEntityForNonExistingReference = createShellEntityForNonExistingReference;
}
public void recordEntityUpdate(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
......
......@@ -177,6 +177,8 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
ret.setGuid(entity.getGuid());
ret.setStatus(entity.getStatus());
ret.setIsIncomplete(entity.getIsIncomplete());
setAttribute(ret, NAME, name);
setAttribute(ret, DESCRIPTION, entity.getAttribute(DESCRIPTION));
setAttribute(ret, OWNER, entity.getAttribute(OWNER));
......
......@@ -20,12 +20,7 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.*;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
......@@ -171,6 +166,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled;
private final boolean createShellEntityForNonExistingReference;
private final NotificationInterface notificationInterface;
private final Configuration applicationProperties;
private ExecutorService executors;
......@@ -205,6 +201,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
String[] patternHiveTablesToPrune = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
......@@ -586,6 +583,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
requestContext.setUser(messageUser, null);
requestContext.setInNotificationProcessing(true);
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
switch (message.getType()) {
case ENTITY_CREATE: {
......
......@@ -45,7 +45,7 @@ import java.util.Date;
import java.util.Set;
import java.util.UUID;
import static org.apache.atlas.AtlasConfiguration.REST_API_ENABLE_DELETE_TYPE_OVERRIDE;
import static org.apache.atlas.AtlasConfiguration.*;
/**
* This records audit information as part of the filter after processing the request
......@@ -56,13 +56,15 @@ public class AuditFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(AuditFilter.class);
private static final Logger AUDIT_LOG = LoggerFactory.getLogger("AUDIT");
private boolean deleteTypeOverrideEnabled = false;
private boolean deleteTypeOverrideEnabled = false;
private boolean createShellEntityForNonExistingReference = false;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
LOG.info("AuditFilter initialization started");
deleteTypeOverrideEnabled = REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
deleteTypeOverrideEnabled = REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
createShellEntityForNonExistingReference = REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
LOG.info("REST_API_ENABLE_DELETE_TYPE_OVERRIDE={}", deleteTypeOverrideEnabled);
}
......@@ -88,6 +90,7 @@ public class AuditFilter implements Filter {
RequestContext requestContext = RequestContext.get();
requestContext.setUser(user, userGroups);
requestContext.setClientIPAddress(AtlasAuthorizationUtils.getRequestIpAddress(httpRequest));
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
if (StringUtils.isNotEmpty(deleteType)) {
if (deleteTypeOverrideEnabled) {
......@@ -196,4 +199,4 @@ public class AuditFilter implements Filter {
return sb.toString();
}
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment