Commit f3bbdc15 by Madhan Neethiraj

ATLAS-1266: fixed typedef APIs to update type-registry only on successful graph commit

parent 6a24cad1
...@@ -32,8 +32,10 @@ import org.apache.commons.lang.StringUtils; ...@@ -32,8 +32,10 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -110,6 +112,9 @@ public class AtlasTypeRegistry { ...@@ -110,6 +112,9 @@ public class AtlasTypeRegistry {
return ret; return ret;
} }
public AtlasBaseTypeDef getTypeDefByName(String name) { return registryData.getTypeDefByName(name); }
public AtlasBaseTypeDef getTypeDefByGuid(String guid) { return registryData.getTypeDefByGuid(guid); }
public Collection<AtlasEnumDef> getAllEnumDefs() { return registryData.enumDefs.getAll(); } public Collection<AtlasEnumDef> getAllEnumDefs() { return registryData.enumDefs.getAll(); }
...@@ -168,6 +173,7 @@ public class AtlasTypeRegistry { ...@@ -168,6 +173,7 @@ public class AtlasTypeRegistry {
final TypeDefCache<AtlasStructDef> structDefs; final TypeDefCache<AtlasStructDef> structDefs;
final TypeDefCache<AtlasClassificationDef> classificationDefs; final TypeDefCache<AtlasClassificationDef> classificationDefs;
final TypeDefCache<AtlasEntityDef> entityDefs; final TypeDefCache<AtlasEntityDef> entityDefs;
final TypeDefCache<? extends AtlasBaseTypeDef>[] allDefCaches;
RegistryData() { RegistryData() {
allTypes = new TypeCache(); allTypes = new TypeCache();
...@@ -175,6 +181,7 @@ public class AtlasTypeRegistry { ...@@ -175,6 +181,7 @@ public class AtlasTypeRegistry {
structDefs = new TypeDefCache<>(allTypes); structDefs = new TypeDefCache<>(allTypes);
classificationDefs = new TypeDefCache<>(allTypes); classificationDefs = new TypeDefCache<>(allTypes);
entityDefs = new TypeDefCache<>(allTypes); entityDefs = new TypeDefCache<>(allTypes);
allDefCaches = new TypeDefCache[] { enumDefs, structDefs, classificationDefs, entityDefs };
allTypes.addType(new AtlasBuiltInTypes.AtlasBooleanType()); allTypes.addType(new AtlasBuiltInTypes.AtlasBooleanType());
allTypes.addType(new AtlasBuiltInTypes.AtlasByteType()); allTypes.addType(new AtlasBuiltInTypes.AtlasByteType());
...@@ -196,6 +203,39 @@ public class AtlasTypeRegistry { ...@@ -196,6 +203,39 @@ public class AtlasTypeRegistry {
structDefs = new TypeDefCache<>(other.structDefs, allTypes); structDefs = new TypeDefCache<>(other.structDefs, allTypes);
classificationDefs = new TypeDefCache<>(other.classificationDefs, allTypes); classificationDefs = new TypeDefCache<>(other.classificationDefs, allTypes);
entityDefs = new TypeDefCache<>(other.entityDefs, allTypes); entityDefs = new TypeDefCache<>(other.entityDefs, allTypes);
allDefCaches = new TypeDefCache[] { enumDefs, structDefs, classificationDefs, entityDefs };
}
AtlasBaseTypeDef getTypeDefByName(String name) {
AtlasBaseTypeDef ret = null;
if (name != null) {
for (TypeDefCache typeDefCache : allDefCaches) {
ret = typeDefCache.getTypeDefByName(name);
if (ret != null) {
break;
}
}
}
return ret;
}
AtlasBaseTypeDef getTypeDefByGuid(String guid) {
AtlasBaseTypeDef ret = null;
if (guid != null) {
for (TypeDefCache typeDefCache : allDefCaches) {
ret = typeDefCache.getTypeDefByGuid(guid);
if (ret != null) {
break;
}
}
}
return ret;
} }
void updateGuid(String typeName, String guid) { void updateGuid(String typeName, String guid) {
...@@ -227,6 +267,10 @@ public class AtlasTypeRegistry { ...@@ -227,6 +267,10 @@ public class AtlasTypeRegistry {
} }
public static class AtlasTransientTypeRegistry extends AtlasTypeRegistry { public static class AtlasTransientTypeRegistry extends AtlasTypeRegistry {
private List<AtlasBaseTypeDef> addedTypes = new ArrayList<>();
private List<AtlasBaseTypeDef> updatedTypes = new ArrayList<>();
private List<AtlasBaseTypeDef> deletedTypes = new ArrayList<>();
private AtlasTransientTypeRegistry(AtlasTypeRegistry parent) { private AtlasTransientTypeRegistry(AtlasTypeRegistry parent) {
super(parent); super(parent);
...@@ -261,7 +305,6 @@ public class AtlasTypeRegistry { ...@@ -261,7 +305,6 @@ public class AtlasTypeRegistry {
registryData.updateGuid(typeName, guid); registryData.updateGuid(typeName, guid);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeRegistry.updateGuid({}, {})", typeName, guid); LOG.debug("<== AtlasTypeRegistry.updateGuid({}, {})", typeName, guid);
} }
...@@ -391,9 +434,15 @@ public class AtlasTypeRegistry { ...@@ -391,9 +434,15 @@ public class AtlasTypeRegistry {
} }
if (guid != null) { if (guid != null) {
AtlasBaseTypeDef typeDef = getTypeDefByGuid(guid);
registryData.removeByGuid(guid); registryData.removeByGuid(guid);
resolveReferences(); resolveReferences();
if (typeDef != null) {
deletedTypes.add(typeDef);
}
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
...@@ -407,9 +456,15 @@ public class AtlasTypeRegistry { ...@@ -407,9 +456,15 @@ public class AtlasTypeRegistry {
} }
if (name != null) { if (name != null) {
AtlasBaseTypeDef typeDef = getTypeDefByName(name);
registryData.removeByName(name); registryData.removeByName(name);
resolveReferences(); resolveReferences();
if (typeDef != null) {
deletedTypes.add(typeDef);
}
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
...@@ -417,6 +472,12 @@ public class AtlasTypeRegistry { ...@@ -417,6 +472,12 @@ public class AtlasTypeRegistry {
} }
} }
public List<AtlasBaseTypeDef> getAddedTypes() { return addedTypes; }
public List<AtlasBaseTypeDef> getUpdatedTypes() { return updatedTypes; }
public List<AtlasBaseTypeDef> getDeleteedTypes() { return deletedTypes; }
private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) { private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
...@@ -442,6 +503,8 @@ public class AtlasTypeRegistry { ...@@ -442,6 +503,8 @@ public class AtlasTypeRegistry {
registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef)); registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef));
} }
addedTypes.add(typeDef);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
...@@ -490,31 +553,34 @@ public class AtlasTypeRegistry { ...@@ -490,31 +553,34 @@ public class AtlasTypeRegistry {
LOG.debug("==> AtlasTypeRegistry.updateTypeByGuidWithNoRefResolve({})", guid); LOG.debug("==> AtlasTypeRegistry.updateTypeByGuidWithNoRefResolve({})", guid);
} }
if (guid == null || typeDef == null) { if (guid != null && typeDef != null) {
// ignore // ignore
} else if (typeDef.getClass().equals(AtlasEnumDef.class)) { if (typeDef.getClass().equals(AtlasEnumDef.class)) {
AtlasEnumDef enumDef = (AtlasEnumDef)typeDef; AtlasEnumDef enumDef = (AtlasEnumDef) typeDef;
registryData.enumDefs.removeTypeDefByGuid(guid); registryData.enumDefs.removeTypeDefByGuid(guid);
registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef)); registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef));
} else if (typeDef.getClass().equals(AtlasStructDef.class)) { } else if (typeDef.getClass().equals(AtlasStructDef.class)) {
AtlasStructDef structDef = (AtlasStructDef)typeDef; AtlasStructDef structDef = (AtlasStructDef) typeDef;
registryData.structDefs.removeTypeDefByGuid(guid); registryData.structDefs.removeTypeDefByGuid(guid);
registryData.structDefs.addType(structDef, new AtlasStructType(structDef)); registryData.structDefs.addType(structDef, new AtlasStructType(structDef));
} else if (typeDef.getClass().equals(AtlasClassificationDef.class)) { } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) {
AtlasClassificationDef classificationDef = (AtlasClassificationDef)typeDef; AtlasClassificationDef classificationDef = (AtlasClassificationDef) typeDef;
registryData.classificationDefs.removeTypeDefByGuid(guid); registryData.classificationDefs.removeTypeDefByGuid(guid);
registryData.classificationDefs.addType(classificationDef, registryData.classificationDefs.addType(classificationDef,
new AtlasClassificationType(classificationDef)); new AtlasClassificationType(classificationDef));
} else if (typeDef.getClass().equals(AtlasEntityDef.class)) { } else if (typeDef.getClass().equals(AtlasEntityDef.class)) {
AtlasEntityDef entityDef = (AtlasEntityDef)typeDef; AtlasEntityDef entityDef = (AtlasEntityDef) typeDef;
registryData.entityDefs.removeTypeDefByGuid(guid); registryData.entityDefs.removeTypeDefByGuid(guid);
registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef)); registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef));
} }
updatedTypes.add(typeDef);
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeRegistry.updateTypeByGuidWithNoRefResolve({})", guid); LOG.debug("<== AtlasTypeRegistry.updateTypeByGuidWithNoRefResolve({})", guid);
} }
...@@ -525,31 +591,33 @@ public class AtlasTypeRegistry { ...@@ -525,31 +591,33 @@ public class AtlasTypeRegistry {
LOG.debug("==> AtlasTypeRegistry.updateTypeByNameWithNoRefResolve({})", name); LOG.debug("==> AtlasTypeRegistry.updateTypeByNameWithNoRefResolve({})", name);
} }
if (name == null || typeDef == null) { if (name != null && typeDef != null) {
// ignore if (typeDef.getClass().equals(AtlasEnumDef.class)) {
} else if (typeDef.getClass().equals(AtlasEnumDef.class)) { AtlasEnumDef enumDef = (AtlasEnumDef) typeDef;
AtlasEnumDef enumDef = (AtlasEnumDef)typeDef;
registryData.enumDefs.removeTypeDefByName(name); registryData.enumDefs.removeTypeDefByName(name);
registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef)); registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef));
} else if (typeDef.getClass().equals(AtlasStructDef.class)) { } else if (typeDef.getClass().equals(AtlasStructDef.class)) {
AtlasStructDef structDef = (AtlasStructDef)typeDef; AtlasStructDef structDef = (AtlasStructDef) typeDef;
registryData.structDefs.removeTypeDefByName(name); registryData.structDefs.removeTypeDefByName(name);
registryData.structDefs.addType(structDef, new AtlasStructType(structDef)); registryData.structDefs.addType(structDef, new AtlasStructType(structDef));
} else if (typeDef.getClass().equals(AtlasClassificationDef.class)) { } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) {
AtlasClassificationDef classificationDef = (AtlasClassificationDef)typeDef; AtlasClassificationDef classificationDef = (AtlasClassificationDef) typeDef;
registryData.classificationDefs.removeTypeDefByName(name); registryData.classificationDefs.removeTypeDefByName(name);
registryData.classificationDefs.addType(classificationDef, registryData.classificationDefs.addType(classificationDef,
new AtlasClassificationType(classificationDef)); new AtlasClassificationType(classificationDef));
} else if (typeDef.getClass().equals(AtlasEntityDef.class)) { } else if (typeDef.getClass().equals(AtlasEntityDef.class)) {
AtlasEntityDef entityDef = (AtlasEntityDef)typeDef; AtlasEntityDef entityDef = (AtlasEntityDef) typeDef;
registryData.entityDefs.removeTypeDefByName(name); registryData.entityDefs.removeTypeDefByName(name);
registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef)); registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef));
} }
updatedTypes.add(typeDef);
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeRegistry.updateTypeByNameWithNoRefResolve({})", name); LOG.debug("<== AtlasTypeRegistry.updateTypeByNameWithNoRefResolve({})", name);
} }
......
...@@ -26,8 +26,14 @@ import org.apache.atlas.typesystem.exception.SchemaNotFoundException; ...@@ -26,8 +26,14 @@ import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class GraphTransactionInterceptor implements MethodInterceptor { public class GraphTransactionInterceptor implements MethodInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class); private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class);
private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>();
private AtlasGraph graph; private AtlasGraph graph;
@Override @Override
...@@ -37,9 +43,13 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -37,9 +43,13 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
graph = AtlasGraphProvider.getGraphInstance(); graph = AtlasGraphProvider.getGraphInstance();
} }
boolean isSuccess = false;
try {
try { try {
Object response = invocation.proceed(); Object response = invocation.proceed();
graph.commit(); graph.commit();
isSuccess = true;
LOG.info("graph commit"); LOG.info("graph commit");
return response; return response;
} catch (Throwable t) { } catch (Throwable t) {
...@@ -51,6 +61,21 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -51,6 +61,21 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
graph.rollback(); graph.rollback();
throw t; throw t;
} }
} finally {
List<PostTransactionHook> trxHooks = postTransactionHooks.get();
if (trxHooks != null) {
postTransactionHooks.remove();
for (PostTransactionHook trxHook : trxHooks) {
try {
trxHook.onComplete(isSuccess);
} catch (Throwable t) {
LOG.error("postTransactionHook failed", t);
}
}
}
}
} }
boolean logException(Throwable t) { boolean logException(Throwable t) {
...@@ -59,4 +84,19 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -59,4 +84,19 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
} }
return true; return true;
} }
public static abstract class PostTransactionHook {
protected PostTransactionHook() {
List<PostTransactionHook> trxHooks = postTransactionHooks.get();
if (trxHooks == null) {
trxHooks = new ArrayList<>();
postTransactionHooks.set(trxHooks);
}
trxHooks.add(this);
}
public abstract void onComplete(boolean isSuccess);
}
} }
...@@ -48,6 +48,7 @@ import org.apache.atlas.services.IBootstrapTypesRegistrar; ...@@ -48,6 +48,7 @@ import org.apache.atlas.services.IBootstrapTypesRegistrar;
import org.apache.atlas.services.MetadataService; import org.apache.atlas.services.MetadataService;
import org.apache.atlas.services.ReservedTypesRegistrar; import org.apache.atlas.services.ReservedTypesRegistrar;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeSystemProvider; import org.apache.atlas.typesystem.types.TypeSystemProvider;
import org.apache.atlas.typesystem.types.cache.TypeCache; import org.apache.atlas.typesystem.types.cache.TypeCache;
...@@ -71,6 +72,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { ...@@ -71,6 +72,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the ITypeStore interface to an implementation // bind the ITypeStore interface to an implementation
bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton(); bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton();
bind(AtlasTypeDefStore.class).to(AtlasTypeDefGraphStoreV1.class).asEagerSingleton(); bind(AtlasTypeDefStore.class).to(AtlasTypeDefGraphStoreV1.class).asEagerSingleton();
bind(AtlasTypeRegistry.class).asEagerSingleton();
//GraphBackedSearchIndexer must be an eager singleton to force the search index creation to happen before //GraphBackedSearchIndexer must be an eager singleton to force the search index creation to happen before
//we try to restore the type system (otherwise we'll end up running queries //we try to restore the type system (otherwise we'll end up running queries
......
...@@ -285,7 +285,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -285,7 +285,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
} else if (isEnumType(atlasType)) { } else if (isEnumType(atlasType)) {
createIndexes(management, propertyName, String.class, isUnique, cardinality, false, isIndexable); createIndexes(management, propertyName, String.class, isUnique, cardinality, false, isIndexable);
} else if (isStructType(atlasType)) { } else if (isStructType(atlasType)) {
AtlasStructDef structDef = typeRegistry.getStructDefByName(attributeDef.getName()); AtlasStructDef structDef = typeRegistry.getStructDefByName(attribTypeName);
updateIndexForTypeDef(management, structDef); updateIndexForTypeDef(management, structDef);
} }
} catch (AtlasBaseException e) { } catch (AtlasBaseException e) {
......
...@@ -17,18 +17,15 @@ ...@@ -17,18 +17,15 @@
*/ */
package org.apache.atlas.repository.store.graph; package org.apache.atlas.repository.store.graph;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction; import org.apache.atlas.GraphTransaction;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.listener.ChangedTypeDefs; import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.model.SearchFilter; import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef.AtlasClassificationDefs; import org.apache.atlas.model.typedef.AtlasClassificationDef.AtlasClassificationDefs;
import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEntityDef;
...@@ -49,7 +46,6 @@ import org.slf4j.Logger; ...@@ -49,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
...@@ -106,9 +102,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -106,9 +102,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid()); ttr.updateGuid(ret.getName(), ret.getGuid());
notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -155,9 +149,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -155,9 +149,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasEnumDef ret = getEnumDefStore(ttr).updateByName(name, enumDef); AtlasEnumDef ret = getEnumDefStore(ttr).updateByName(name, enumDef);
notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -171,9 +163,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -171,9 +163,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasEnumDef ret = getEnumDefStore(ttr).updateByGuid(guid, enumDef); AtlasEnumDef ret = getEnumDefStore(ttr).updateByGuid(guid, enumDef);
notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -189,9 +179,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -189,9 +179,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getEnumDefStore(ttr).deleteByName(name); getEnumDefStore(ttr).deleteByName(name);
notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
} }
@Override @Override
...@@ -205,9 +193,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -205,9 +193,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getEnumDefStore(ttr).deleteByGuid(guid); getEnumDefStore(ttr).deleteByGuid(guid);
notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
} }
@Override @Override
...@@ -231,9 +217,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -231,9 +217,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid()); ttr.updateGuid(ret.getName(), ret.getGuid());
notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -280,9 +264,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -280,9 +264,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasStructDef ret = getStructDefStore(ttr).updateByName(name, structDef); AtlasStructDef ret = getStructDefStore(ttr).updateByName(name, structDef);
notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -296,9 +278,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -296,9 +278,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasStructDef ret = getStructDefStore(ttr).updateByGuid(guid, structDef); AtlasStructDef ret = getStructDefStore(ttr).updateByGuid(guid, structDef);
notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -314,9 +294,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -314,9 +294,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getStructDefStore(ttr).deleteByName(name, null); getStructDefStore(ttr).deleteByName(name, null);
notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
} }
@Override @Override
...@@ -330,9 +308,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -330,9 +308,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getStructDefStore(ttr).deleteByGuid(guid, null); getStructDefStore(ttr).deleteByGuid(guid, null);
notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
} }
@Override @Override
...@@ -357,9 +333,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -357,9 +333,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid()); ttr.updateGuid(ret.getName(), ret.getGuid());
notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -408,9 +382,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -408,9 +382,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByName(name, classificationDef); AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByName(name, classificationDef);
notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -425,9 +397,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -425,9 +397,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByGuid(guid, classificationDef); AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByGuid(guid, classificationDef);
notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -443,9 +413,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -443,9 +413,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getClassificationDefStore(ttr).deleteByName(name, null); getClassificationDefStore(ttr).deleteByName(name, null);
notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
} }
@Override @Override
...@@ -459,9 +427,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -459,9 +427,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getClassificationDefStore(ttr).deleteByGuid(guid, null); getClassificationDefStore(ttr).deleteByGuid(guid, null);
notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
} }
@Override @Override
...@@ -485,9 +451,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -485,9 +451,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
ttr.updateGuid(ret.getName(), ret.getGuid()); ttr.updateGuid(ret.getName(), ret.getGuid());
notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -534,9 +498,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -534,9 +498,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasEntityDef ret = getEntityDefStore(ttr).updateByName(name, entityDef); AtlasEntityDef ret = getEntityDefStore(ttr).updateByName(name, entityDef);
notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -550,9 +512,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -550,9 +512,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
AtlasEntityDef ret = getEntityDefStore(ttr).updateByGuid(guid, entityDef); AtlasEntityDef ret = getEntityDefStore(ttr).updateByGuid(guid, entityDef);
notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
return ret; return ret;
} }
...@@ -568,9 +528,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -568,9 +528,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getEntityDefStore(ttr).deleteByName(name, null); getEntityDefStore(ttr).deleteByName(name, null);
notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
} }
@Override @Override
...@@ -584,9 +542,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -584,9 +542,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
getEntityDefStore(ttr).deleteByGuid(guid, null); getEntityDefStore(ttr).deleteByGuid(guid, null);
notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); updateTypeRegistryPostCommit(ttr);
typeRegistry.commitTransientTypeRegistry(ttr);
} }
@Override @Override
...@@ -689,18 +645,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -689,18 +645,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
} }
} }
List<AtlasBaseTypeDef> createdTypeDefs = new ArrayList<>(); updateTypeRegistryPostCommit(ttr);
createdTypeDefs.addAll(ret.getEnumDefs());
createdTypeDefs.addAll(ret.getStructDefs());
createdTypeDefs.addAll(ret.getClassificationDefs());
createdTypeDefs.addAll(ret.getEntityDefs());
ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs();
changedTypeDefs.setCreateTypeDefs(createdTypeDefs);
notifyListeners(changedTypeDefs);
typeRegistry.commitTransientTypeRegistry(ttr);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})", LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})",
...@@ -759,18 +704,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -759,18 +704,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
} }
} }
List<AtlasBaseTypeDef> updatedTypeDefs = new ArrayList<>(); updateTypeRegistryPostCommit(ttr);
updatedTypeDefs.addAll(ret.getEnumDefs());
updatedTypeDefs.addAll(ret.getStructDefs());
updatedTypeDefs.addAll(ret.getClassificationDefs());
updatedTypeDefs.addAll(ret.getEntityDefs());
ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs();
changedTypeDefs.setUpdatedTypeDefs(updatedTypeDefs);
notifyListeners(changedTypeDefs);
typeRegistry.commitTransientTypeRegistry(ttr);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})", LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})",
...@@ -884,12 +818,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -884,12 +818,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
} }
} }
Iterable<AtlasBaseTypeDef> deleted = Iterables.concat(typesDef.getEnumDefs(), typesDef.getClassificationDefs(), updateTypeRegistryPostCommit(ttr);
typesDef.getClassificationDefs(), typesDef.getEntityDefs());
notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(deleted));
typeRegistry.commitTransientTypeRegistry(ttr);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})", LOG.debug("<== AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})",
...@@ -957,38 +886,50 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ...@@ -957,38 +886,50 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
LOG.info("Not reacting to a Passive state change"); LOG.info("Not reacting to a Passive state change");
} }
private void notifyListeners(TypeDefChangeType type, List<? extends AtlasBaseTypeDef> typeDefs) private void updateTypeRegistryPostCommit(AtlasTransientTypeRegistry ttr) {
throws AtlasBaseException { new TypeRegistryUpdateHook(ttr);
ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs();
switch (type) {
case CREATE:
changedTypeDefs.setCreateTypeDefs(typeDefs);
break;
case UPDATE:
changedTypeDefs.setUpdatedTypeDefs(typeDefs);
break;
case DELETE:
changedTypeDefs.setDeletedTypeDefs(typeDefs);
break;
} }
notifyListeners(changedTypeDefs); private class TypeRegistryUpdateHook extends GraphTransactionInterceptor.PostTransactionHook {
private final AtlasTransientTypeRegistry ttr;
private TypeRegistryUpdateHook(AtlasTransientTypeRegistry ttr) {
super();
this.ttr = ttr;
} }
private void notifyListeners(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { @Override
public void onComplete(boolean isSuccess) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TypeRegistryUpdateHook.onComplete({})", isSuccess);
}
if (isSuccess) {
typeRegistry.commitTransientTypeRegistry(ttr);
notifyListeners(ttr);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== TypeRegistryUpdateHook.onComplete({})", isSuccess);
}
}
private void notifyListeners(AtlasTransientTypeRegistry ttr) {
if (CollectionUtils.isNotEmpty(typeDefChangeListeners)) { if (CollectionUtils.isNotEmpty(typeDefChangeListeners)) {
ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(ttr.getAddedTypes(),
ttr.getUpdatedTypes(),
ttr.getDeleteedTypes());
for (TypeDefChangeListener changeListener : typeDefChangeListeners) { for (TypeDefChangeListener changeListener : typeDefChangeListeners) {
try { try {
changeListener.onChange(changedTypeDefs); changeListener.onChange(changedTypeDefs);
} catch (AtlasBaseException e) { } catch (Throwable t) {
LOG.error("OnChange failed for listener {}", changeListener.getClass().getName()); LOG.error("OnChange failed for listener {}", changeListener.getClass().getName(), t);
throw e;
} }
} }
} }
} }
private enum TypeDefChangeType {
CREATE, UPDATE, DELETE
} }
} }
...@@ -110,9 +110,9 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -110,9 +110,9 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
LOG.info("<== AtlasTypeDefGraphStoreV1.init()"); LOG.info("<== AtlasTypeDefGraphStoreV1.init()");
} }
public AtlasGraph getAtlasGraph() { return atlasGraph; } AtlasGraph getAtlasGraph() { return atlasGraph; }
public AtlasVertex findTypeVertexByName(String typeName) { AtlasVertex findTypeVertexByName(String typeName) {
Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(Constants.TYPENAME_PROPERTY_KEY, typeName) .has(Constants.TYPENAME_PROPERTY_KEY, typeName)
.vertices().iterator(); .vertices().iterator();
...@@ -122,7 +122,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -122,7 +122,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public AtlasVertex findTypeVertexByNameAndCategory(String typeName, TypeCategory category) { AtlasVertex findTypeVertexByNameAndCategory(String typeName, TypeCategory category) {
Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(Constants.TYPENAME_PROPERTY_KEY, typeName) .has(Constants.TYPENAME_PROPERTY_KEY, typeName)
.has(TYPE_CATEGORY_PROPERTY_KEY, category) .has(TYPE_CATEGORY_PROPERTY_KEY, category)
...@@ -133,7 +133,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -133,7 +133,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public AtlasVertex findTypeVertexByGuid(String typeGuid) { AtlasVertex findTypeVertexByGuid(String typeGuid) {
Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(Constants.GUID_PROPERTY_KEY, typeGuid) .has(Constants.GUID_PROPERTY_KEY, typeGuid)
.vertices().iterator(); .vertices().iterator();
...@@ -143,7 +143,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -143,7 +143,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public AtlasVertex findTypeVertexByGuidAndCategory(String typeGuid, TypeCategory category) { AtlasVertex findTypeVertexByGuidAndCategory(String typeGuid, TypeCategory category) {
Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(Constants.GUID_PROPERTY_KEY, typeGuid) .has(Constants.GUID_PROPERTY_KEY, typeGuid)
.has(TYPE_CATEGORY_PROPERTY_KEY, category) .has(TYPE_CATEGORY_PROPERTY_KEY, category)
...@@ -154,7 +154,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -154,7 +154,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public Iterator<AtlasVertex> findTypeVerticesByCategory(TypeCategory category) { Iterator<AtlasVertex> findTypeVerticesByCategory(TypeCategory category) {
Iterator<AtlasVertex> ret = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) Iterator<AtlasVertex> ret = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE)
.has(TYPE_CATEGORY_PROPERTY_KEY, category) .has(TYPE_CATEGORY_PROPERTY_KEY, category)
.vertices().iterator(); .vertices().iterator();
...@@ -162,7 +162,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -162,7 +162,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public AtlasVertex createTypeVertex(AtlasBaseTypeDef typeDef) { AtlasVertex createTypeVertex(AtlasBaseTypeDef typeDef) {
// Validate all the required checks // Validate all the required checks
Preconditions.checkArgument(StringUtils.isNotBlank(typeDef.getName()), "Type name can't be null/empty"); Preconditions.checkArgument(StringUtils.isNotBlank(typeDef.getName()), "Type name can't be null/empty");
...@@ -203,7 +203,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -203,7 +203,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public void updateTypeVertex(AtlasBaseTypeDef typeDef, AtlasVertex vertex) { void updateTypeVertex(AtlasBaseTypeDef typeDef, AtlasVertex vertex) {
if (!isTypeVertex(vertex)) { if (!isTypeVertex(vertex)) {
LOG.warn("updateTypeVertex(): not a type-vertex - {}", vertex); LOG.warn("updateTypeVertex(): not a type-vertex - {}", vertex);
...@@ -223,7 +223,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -223,7 +223,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
markVertexUpdated(vertex); markVertexUpdated(vertex);
} }
public void deleteTypeVertexOutEdges(AtlasVertex vertex) throws AtlasBaseException { void deleteTypeVertexOutEdges(AtlasVertex vertex) throws AtlasBaseException {
Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT); Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT);
for (AtlasEdge edge : edges) { for (AtlasEdge edge : edges) {
...@@ -231,7 +231,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -231,7 +231,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
} }
} }
public void deleteTypeVertex(AtlasVertex vertex) throws AtlasBaseException { void deleteTypeVertex(AtlasVertex vertex) throws AtlasBaseException {
Iterator<AtlasEdge> inEdges = vertex.getEdges(AtlasEdgeDirection.IN).iterator(); Iterator<AtlasEdge> inEdges = vertex.getEdges(AtlasEdgeDirection.IN).iterator();
if (inEdges.hasNext()) { if (inEdges.hasNext()) {
...@@ -247,7 +247,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -247,7 +247,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
atlasGraph.removeVertex(vertex); atlasGraph.removeVertex(vertex);
} }
public void vertexToTypeDef(AtlasVertex vertex, AtlasBaseTypeDef typeDef) { void vertexToTypeDef(AtlasVertex vertex, AtlasBaseTypeDef typeDef) {
String name = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class); String name = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class);
String description = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class); String description = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class);
String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class); String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class);
...@@ -274,7 +274,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -274,7 +274,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
} }
} }
public boolean isTypeVertex(AtlasVertex vertex) { boolean isTypeVertex(AtlasVertex vertex) {
String vertexType = vertex.getProperty(Constants.VERTEX_TYPE_PROPERTY_KEY, String.class); String vertexType = vertex.getProperty(Constants.VERTEX_TYPE_PROPERTY_KEY, String.class);
boolean ret = VERTEX_TYPE.equals(vertexType); boolean ret = VERTEX_TYPE.equals(vertexType);
...@@ -282,7 +282,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -282,7 +282,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public boolean isTypeVertex(AtlasVertex vertex, TypeCategory category) { boolean isTypeVertex(AtlasVertex vertex, TypeCategory category) {
boolean ret = false; boolean ret = false;
if (isTypeVertex(vertex)) { if (isTypeVertex(vertex)) {
...@@ -294,7 +294,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -294,7 +294,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public boolean isTypeVertex(AtlasVertex vertex, TypeCategory[] categories) { boolean isTypeVertex(AtlasVertex vertex, TypeCategory[] categories) {
boolean ret = false; boolean ret = false;
if (isTypeVertex(vertex)) { if (isTypeVertex(vertex)) {
...@@ -312,7 +312,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -312,7 +312,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) {
AtlasEdge ret = null; AtlasEdge ret = null;
Iterable<AtlasEdge> edges = outVertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel); Iterable<AtlasEdge> edges = outVertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel);
...@@ -330,13 +330,13 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -330,13 +330,13 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
public AtlasEdge addEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { AtlasEdge addEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) {
AtlasEdge ret = atlasGraph.addEdge(outVertex, inVertex, edgeLabel); AtlasEdge ret = atlasGraph.addEdge(outVertex, inVertex, edgeLabel);
return ret; return ret;
} }
public void createSuperTypeEdges(AtlasVertex vertex, Set<String> superTypes, TypeCategory typeCategory) void createSuperTypeEdges(AtlasVertex vertex, Set<String> superTypes, TypeCategory typeCategory)
throws AtlasBaseException { throws AtlasBaseException {
Set<String> currentSuperTypes = getSuperTypeNames(vertex); Set<String> currentSuperTypes = getSuperTypeNames(vertex);
...@@ -355,7 +355,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -355,7 +355,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
} }
} }
public Set<String> getSuperTypeNames(AtlasVertex vertex) { Set<String> getSuperTypeNames(AtlasVertex vertex) {
Set<String> ret = new HashSet<>(); Set<String> ret = new HashSet<>();
Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, AtlasGraphUtilsV1.SUPERTYPE_EDGE_LABEL); Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, AtlasGraphUtilsV1.SUPERTYPE_EDGE_LABEL);
...@@ -366,7 +366,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -366,7 +366,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
return ret; return ret;
} }
private TypeCategory getTypeCategory(AtlasBaseTypeDef typeDef) { TypeCategory getTypeCategory(AtlasBaseTypeDef typeDef) {
TypeCategory ret = null; TypeCategory ret = null;
if (typeDef instanceof AtlasEntityDef) { if (typeDef instanceof AtlasEntityDef) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment