Commit ea38942b by apoorvnaik Committed by Madhan Neethiraj

ATLAS-1563: Entity change listener invocation in V2 Store

parent b6b6f945
...@@ -76,17 +76,24 @@ public class AtlasInstanceConverter { ...@@ -76,17 +76,24 @@ public class AtlasInstanceConverter {
Iterator<AtlasEntity> entityIterator = entities.iterator(); Iterator<AtlasEntity> entityIterator = entities.iterator();
for (int i = 0; i < entities.size(); i++) { for (int i = 0; i < entities.size(); i++) {
ITypedReferenceableInstance typedInstance = getITypedReferenceable(entityIterator.next(), ctx); ITypedReferenceableInstance typedInstance = getITypedReferenceable(entityIterator.next());
entitiesInOldFormat[i] = typedInstance; entitiesInOldFormat[i] = typedInstance;
} }
return entitiesInOldFormat; return entitiesInOldFormat;
} }
public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity, AtlasFormatConverter.ConverterContext ctx) throws AtlasBaseException { public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity) throws AtlasBaseException {
Referenceable ref = getReferenceable(entity, ctx); try {
return metadataService.getEntityDefinition(entity.getGuid());
} catch (AtlasException e) {
LOG.error("Exception while getting a typed reference for the entity ", e);
throw toAtlasBaseException(e);
}
}
public ITypedReferenceableInstance getITypedReferenceable(String guid) throws AtlasBaseException {
try { try {
return metadataService.getTypedReferenceableInstance(ref); return metadataService.getEntityDefinition(guid);
} catch (AtlasException e) { } catch (AtlasException e) {
LOG.error("Exception while getting a typed reference for the entity ", e); LOG.error("Exception while getting a typed reference for the entity ", e);
throw toAtlasBaseException(e); throw toAtlasBaseException(e);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v1;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
@Singleton
public class AtlasEntityChangeNotifier {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
private final Set<EntityChangeListener> entityChangeListeners;
private final AtlasInstanceConverter instanceConverter;
@Inject
public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners,
AtlasInstanceConverter instanceConverter) {
this.entityChangeListeners = entityChangeListeners;
this.instanceConverter = instanceConverter;
}
public void onEntitiesMutated(EntityMutationResponse entityMutationResponse) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) {
return;
}
List<AtlasEntityHeader> createdEntities = entityMutationResponse.getCreatedEntities();
List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities();
List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
List<AtlasEntityHeader> deletedEntities = entityMutationResponse.getDeletedEntities();
if (CollectionUtils.isNotEmpty(createdEntities)) {
List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(createdEntities);
notifyListeners(typedRefInst, EntityOperation.CREATE);
}
if (CollectionUtils.isNotEmpty(updatedEntities)) {
List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(updatedEntities);
notifyListeners(typedRefInst, EntityOperation.UPDATE);
}
if (CollectionUtils.isNotEmpty(partiallyUpdatedEntities)) {
List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(partiallyUpdatedEntities);
notifyListeners(typedRefInst, EntityOperation.PARTIAL_UPDATE);
}
if (CollectionUtils.isNotEmpty(deletedEntities)) {
List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(deletedEntities);
notifyListeners(typedRefInst, EntityOperation.DELETE);
}
}
private void notifyListeners(List<ITypedReferenceableInstance> typedRefInsts, EntityOperation operation) throws AtlasBaseException {
for (EntityChangeListener listener : entityChangeListeners) {
try {
switch (operation) {
case CREATE:
listener.onEntitiesAdded(typedRefInsts);
break;
case UPDATE:
case PARTIAL_UPDATE:
listener.onEntitiesUpdated(typedRefInsts);
break;
case DELETE:
listener.onEntitiesDeleted(typedRefInsts);
break;
}
} catch (AtlasException e) {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, operation.toString());
}
}
}
private List<ITypedReferenceableInstance> toITypedReferenceable(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException {
List<ITypedReferenceableInstance> ret = new ArrayList<>(entityHeaders.size());
for (AtlasEntityHeader entityHeader : entityHeaders) {
ret.add(instanceConverter.getITypedReferenceable(entityHeader.getGuid()));
}
return ret;
}
}
\ No newline at end of file
...@@ -25,9 +25,13 @@ import org.apache.atlas.GraphTransaction; ...@@ -25,9 +25,13 @@ import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RequestContextV1; import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.*; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
...@@ -49,13 +53,15 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*; ...@@ -49,13 +53,15 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
public class AtlasEntityStoreV1 implements AtlasEntityStore { public class AtlasEntityStoreV1 implements AtlasEntityStore {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
private final DeleteHandlerV1 deleteHandler; private final DeleteHandlerV1 deleteHandler;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final AtlasEntityChangeNotifier entityChangeNotifier;
@Inject @Inject
public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry) { public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) {
this.deleteHandler = deleteHandler; this.deleteHandler = deleteHandler;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.entityChangeNotifier = entityChangeNotifier;
} }
@Override @Override
...@@ -208,6 +214,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -208,6 +214,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
LOG.debug("<== createOrUpdate()"); LOG.debug("<== createOrUpdate()");
} }
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret);
return ret; return ret;
} }
...@@ -252,7 +261,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -252,7 +261,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
deletionCandidates.add(vertex); deletionCandidates.add(vertex);
return deleteVertices(deletionCandidates); EntityMutationResponse ret = deleteVertices(deletionCandidates);
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret);
return ret;
} }
@Override @Override
...@@ -281,7 +295,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -281,7 +295,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
if (deletionCandidates.isEmpty()) { if (deletionCandidates.isEmpty()) {
LOG.info("No deletion candidate entities were found for guids %s", guids); LOG.info("No deletion candidate entities were found for guids %s", guids);
} }
return deleteVertices(deletionCandidates);
EntityMutationResponse ret = deleteVertices(deletionCandidates);
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret);
return ret;
} }
@Override @Override
...@@ -297,7 +317,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -297,7 +317,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
deletionCandidates.add(vertex); deletionCandidates.add(vertex);
return deleteVertices(deletionCandidates); EntityMutationResponse ret = deleteVertices(deletionCandidates);
// Notify the change listeners
entityChangeNotifier.onEntitiesMutated(ret);
return ret;
} }
@Override @Override
......
...@@ -19,7 +19,6 @@ package org.apache.atlas.repository.store.graph.v1; ...@@ -19,7 +19,6 @@ package org.apache.atlas.repository.store.graph.v1;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.RequestContextV1; import org.apache.atlas.RequestContextV1;
...@@ -59,7 +58,6 @@ import org.apache.atlas.typesystem.persistence.Id; ...@@ -59,7 +58,6 @@ import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
...@@ -69,7 +67,6 @@ import org.testng.annotations.Test; ...@@ -69,7 +67,6 @@ import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.lang.reflect.Constructor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
...@@ -82,6 +79,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE; ...@@ -82,6 +79,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE;
import static org.apache.atlas.TestUtils.DEPARTMENT_TYPE; import static org.apache.atlas.TestUtils.DEPARTMENT_TYPE;
import static org.apache.atlas.TestUtils.NAME; import static org.apache.atlas.TestUtils.NAME;
import static org.apache.atlas.TestUtils.TABLE_TYPE; import static org.apache.atlas.TestUtils.TABLE_TYPE;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
...@@ -107,6 +105,8 @@ public abstract class AtlasDeleteHandlerV1Test { ...@@ -107,6 +105,8 @@ public abstract class AtlasDeleteHandlerV1Test {
private TypeSystem typeSystem = TypeSystem.getInstance(); private TypeSystem typeSystem = TypeSystem.getInstance();
AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -145,7 +145,7 @@ public abstract class AtlasDeleteHandlerV1Test { ...@@ -145,7 +145,7 @@ public abstract class AtlasDeleteHandlerV1Test {
@BeforeTest @BeforeTest
public void init() throws Exception { public void init() throws Exception {
DeleteHandlerV1 deleteHandler = getDeleteHandler(typeRegistry); DeleteHandlerV1 deleteHandler = getDeleteHandler(typeRegistry);
entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry); entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier);
RequestContextV1.clear(); RequestContextV1.clear();
} }
......
...@@ -73,6 +73,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE; ...@@ -73,6 +73,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE;
import static org.apache.atlas.TestUtils.NAME; import static org.apache.atlas.TestUtils.NAME;
import static org.apache.atlas.TestUtils.randomString; import static org.apache.atlas.TestUtils.randomString;
import static org.apache.atlas.TestUtilsV2.TABLE_TYPE; import static org.apache.atlas.TestUtilsV2.TABLE_TYPE;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
...@@ -98,6 +99,8 @@ public class AtlasEntityStoreV1Test { ...@@ -98,6 +99,8 @@ public class AtlasEntityStoreV1Test {
private AtlasEntityWithExtInfo dbEntity; private AtlasEntityWithExtInfo dbEntity;
private AtlasEntityWithExtInfo tblEntity; private AtlasEntityWithExtInfo tblEntity;
AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -128,7 +131,7 @@ public class AtlasEntityStoreV1Test { ...@@ -128,7 +131,7 @@ public class AtlasEntityStoreV1Test {
@BeforeTest @BeforeTest
public void init() throws Exception { public void init() throws Exception {
entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry); entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier);
RequestContextV1.clear(); RequestContextV1.clear();
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.listener; package org.apache.atlas.listener;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.ITypedReferenceableInstance;
......
...@@ -74,7 +74,7 @@ public class EntityREST { ...@@ -74,7 +74,7 @@ public class EntityREST {
public static final String PREFIX_ATTR = "attr:"; public static final String PREFIX_ATTR = "attr:";
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final AtlasInstanceConverter restAdapters; private final AtlasInstanceConverter instanceConverter;
private final MetadataService metadataService; private final MetadataService metadataService;
private final AtlasEntityStore entitiesStore; private final AtlasEntityStore entitiesStore;
...@@ -82,7 +82,7 @@ public class EntityREST { ...@@ -82,7 +82,7 @@ public class EntityREST {
public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter, public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter,
MetadataService metadataService, AtlasEntityStore entitiesStore) { MetadataService metadataService, AtlasEntityStore entitiesStore) {
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.restAdapters = instanceConverter; this.instanceConverter = instanceConverter;
this.metadataService = metadataService; this.metadataService = metadataService;
this.entitiesStore = entitiesStore; this.entitiesStore = entitiesStore;
} }
...@@ -204,7 +204,7 @@ public class EntityREST { ...@@ -204,7 +204,7 @@ public class EntityREST {
try { try {
IStruct trait = metadataService.getTraitDefinition(guid, typeName); IStruct trait = metadataService.getTraitDefinition(guid, typeName);
return restAdapters.getClassification(trait); return instanceConverter.getClassification(trait);
} catch (AtlasException e) { } catch (AtlasException e) {
throw toAtlasBaseException(e); throw toAtlasBaseException(e);
...@@ -231,7 +231,7 @@ public class EntityREST { ...@@ -231,7 +231,7 @@ public class EntityREST {
List<AtlasClassification> clsList = new ArrayList<>(); List<AtlasClassification> clsList = new ArrayList<>();
for ( String traitName : metadataService.getTraitNames(guid) ) { for ( String traitName : metadataService.getTraitNames(guid) ) {
IStruct trait = metadataService.getTraitDefinition(guid, traitName); IStruct trait = metadataService.getTraitDefinition(guid, traitName);
AtlasClassification cls = restAdapters.getClassification(trait); AtlasClassification cls = instanceConverter.getClassification(trait);
clsList.add(cls); clsList.add(cls);
} }
...@@ -258,7 +258,7 @@ public class EntityREST { ...@@ -258,7 +258,7 @@ public class EntityREST {
} }
for (AtlasClassification classification: classifications) { for (AtlasClassification classification: classifications) {
final ITypedStruct trait = restAdapters.getTrait(classification); final ITypedStruct trait = instanceConverter.getTrait(classification);
try { try {
metadataService.addTrait(guid, trait); metadataService.addTrait(guid, trait);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
...@@ -378,7 +378,7 @@ public class EntityREST { ...@@ -378,7 +378,7 @@ public class EntityREST {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "empty entity list"); throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "empty entity list");
} }
final ITypedStruct trait = restAdapters.getTrait(classification); final ITypedStruct trait = instanceConverter.getTrait(classification);
try { try {
metadataService.addTrait(entityGuids, trait); metadataService.addTrait(entityGuids, trait);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment