Commit 2f6d7b24 by Madhan Neethiraj

ATLAS-2085: HA initialization fix

parent 64c9bde6
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"classificationDefs": [
{
"name": "TaxonomyTerm",
"superTypes": [],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "atlas.taxonomy",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
],
"entityDefs": [
{
"name": "Referenceable",
......
......@@ -26,7 +26,6 @@ import org.apache.atlas.catalog.definition.ResourceDefinition;
import org.apache.atlas.catalog.exception.CatalogRuntimeException;
import org.apache.atlas.catalog.exception.ResourceAlreadyExistsException;
import org.apache.atlas.catalog.exception.ResourceNotFoundException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
......@@ -36,7 +35,6 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.converters.TypeConverterUtil;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
......@@ -70,34 +68,6 @@ public class DefaultTypeSystem implements AtlasTypeSystem {
public DefaultTypeSystem(MetadataService metadataService, AtlasTypeDefStore typeDefStore) throws AtlasBaseException {
this.metadataService = metadataService;
this.typeDefStore = typeDefStore;
//Create namespace
createSuperTypes();
}
@InterfaceAudience.Private
private void createSuperTypes() throws AtlasBaseException {
AtlasClassificationDef termClassification = AtlasTypeUtil.createTraitTypeDef(TaxonomyResourceProvider.TAXONOMY_TERM_TYPE, TaxonomyResourceProvider.TAXONOMY_TERM_TYPE,
ImmutableSet.<String>of(), AtlasTypeUtil.createOptionalAttrDef(TaxonomyResourceProvider.NAMESPACE_ATTRIBUTE_NAME, "string"));
createTraitType(termClassification);
}
private void createTraitType(AtlasClassificationDef classificationDef) throws AtlasBaseException {
try {
typeDefStore.getClassificationDefByName(classificationDef.getName());
} catch (AtlasBaseException tne) {
//Type not found . Create
if (tne.getAtlasErrorCode() == AtlasErrorCode.TYPE_NAME_NOT_FOUND) {
AtlasTypesDef typesDef = new AtlasTypesDef(ImmutableList.<AtlasEnumDef>of(), ImmutableList.<AtlasStructDef>of(),
ImmutableList.of(classificationDef),
ImmutableList.<AtlasEntityDef>of());
typeDefStore.createTypesDef(typesDef);
} else {
throw tne;
}
}
}
@Override
......
......@@ -132,19 +132,20 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
AtlasGraphManagement management = graph.getManagementSystem();
try {
if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) {
LOG.info("Global indexes already exist for graph");
management.commit();
LOG.info("Creating indexes for graph.");
return;
if (management.getGraphIndex(Constants.VERTEX_INDEX) == null) {
management.createVertexIndex(Constants.VERTEX_INDEX, Constants.BACKING_INDEX, Collections.<AtlasPropertyKey>emptyList());
LOG.info("Created index {}", Constants.VERTEX_INDEX);
}
/* This is called only once, which is the first time Atlas types are made indexable .*/
LOG.info("Indexes do not exist, Creating indexes for graph.");
if (management.getGraphIndex(Constants.EDGE_INDEX) == null) {
management.createEdgeIndex(Constants.EDGE_INDEX, Constants.BACKING_INDEX);
LOG.info("Created index {}", Constants.EDGE_INDEX);
}
management.createVertexIndex(Constants.VERTEX_INDEX, Constants.BACKING_INDEX, Collections.<AtlasPropertyKey>emptyList());
management.createEdgeIndex(Constants.EDGE_INDEX, Constants.BACKING_INDEX);
// create a composite index for guid as its unique
createIndexes(management, Constants.GUID_PROPERTY_KEY, String.class, true,
......@@ -200,11 +201,14 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
}
private void createFullTextIndex(AtlasGraphManagement management) {
AtlasPropertyKey fullText =
management.makePropertyKey(Constants.ENTITY_TEXT_PROPERTY_KEY, String.class, AtlasCardinality.SINGLE);
if (!management.containsPropertyKey(Constants.ENTITY_TEXT_PROPERTY_KEY)) {
AtlasPropertyKey fullText =
management.makePropertyKey(Constants.ENTITY_TEXT_PROPERTY_KEY, String.class, AtlasCardinality.SINGLE);
management.createFullTextIndex(Constants.FULLTEXT_INDEX, fullText, Constants.BACKING_INDEX);
management.createFullTextIndex(Constants.FULLTEXT_INDEX, fullText, Constants.BACKING_INDEX);
LOG.info("Created index {}", Constants.ENTITY_TEXT_PROPERTY_KEY);
}
}
private void createTypeStoreIndexes(AtlasGraphManagement management) {
......@@ -516,17 +520,21 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
propertyKey = management.makePropertyKey(propertyName, propertyClass, cardinality);
updateVertexIndex(management, propertyName, propertyClass, cardinality, propertyKey);
}
if (createCompositeForAttribute) {
createExactMatchIndex(management, propertyClass, propertyKey, isUnique);
} else if (createCompositeWithTypeandSuperTypes) {
// Index with typename since typename+property key queries need to
// speed up
createExactMatchIndexWithTypeName(management, propertyClass, propertyKey);
createExactMatchIndexWithSuperTypeName(management, propertyClass, propertyKey);
if (propertyKey != null) {
if (createCompositeForAttribute) {
createExactMatchIndex(management, propertyClass, propertyKey, isUnique);
} else if (createCompositeWithTypeandSuperTypes) {
// Index with typename since typename+property key queries need to
// speed up
createExactMatchIndexWithTypeName(management, propertyClass, propertyKey);
createExactMatchIndexWithSuperTypeName(management, propertyClass, propertyKey);
}
} else {
LOG.warn("Index not created for {}: propertyKey is null", propertyName);
}
return propertyKey;
}
......@@ -542,9 +550,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
AtlasGraphIndex existingIndex = management.getGraphIndex(propertyName);
if (existingIndex == null) {
management.createExactMatchIndex(propertyName, enforceUniqueness, Collections.singletonList(propertyKey));
}
LOG.info("Created composite index for property {} of type {}; isUnique={} ", propertyName, propertyClass.getName(), enforceUniqueness);
LOG.info("Created composite index for property {} of type {}; isUnique={} ", propertyName, propertyClass.getName(), enforceUniqueness);
}
}
......@@ -578,7 +586,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
AtlasGraphIndex existingIndex = management.getGraphIndex(indexName);
if (existingIndex == null) {
List<AtlasPropertyKey> keys = new ArrayList<>(2);
keys.add(propertyKey);
keys.add(typePropertyKey);
......
......@@ -18,7 +18,10 @@
package org.apache.atlas.repository.store.bootstrap;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
......@@ -33,6 +36,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.annotate.JsonAutoDetect;
......@@ -63,27 +67,41 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL
* Class that handles initial loading of models and patches into typedef store
*/
@Service
public class AtlasTypeDefStoreInitializer {
public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class);
private final AtlasTypeDefStore atlasTypeDefStore;
private final AtlasTypeRegistry atlasTypeRegistry;
private final Configuration conf;
@Inject
public AtlasTypeDefStoreInitializer(AtlasTypeDefStore atlasTypeDefStore, AtlasTypeRegistry atlasTypeRegistry) {
public AtlasTypeDefStoreInitializer(AtlasTypeDefStore atlasTypeDefStore, AtlasTypeRegistry atlasTypeRegistry, Configuration conf) {
this.atlasTypeDefStore = atlasTypeDefStore;
this.atlasTypeRegistry = atlasTypeRegistry;
this.conf = conf;
}
@PostConstruct
public void init() {
String atlasHomeDir = System.getProperty("atlas.home");
String typesDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";
public void init() throws AtlasBaseException {
LOG.info("==> AtlasTypeDefStoreInitializer.init()");
if (!HAConfiguration.isHAEnabled(conf)) {
atlasTypeDefStore.init();
loadBootstrapTypeDefs();
} else {
LOG.info("AtlasTypeDefStoreInitializer.init(): deferring type loading until instance activation");
}
initializeStore(typesDirName);
LOG.info("<== AtlasTypeDefStoreInitializer.init()");
}
private void initializeStore(String typesDirName) {
private void loadBootstrapTypeDefs() {
LOG.info("==> AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()");
String atlasHomeDir = System.getProperty("atlas.home");
String typesDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";
File typesDir = new File(typesDirName);
File[] typeDefFiles = typesDir.exists() ? typesDir.listFiles() : null;
......@@ -128,6 +146,8 @@ public class AtlasTypeDefStoreInitializer {
}
applyTypePatches(typesDirName);
LOG.info("<== AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()");
}
public static AtlasTypesDef getTypesToCreate(AtlasTypesDef typesDef, AtlasTypeRegistry typeRegistry) {
......@@ -260,6 +280,28 @@ public class AtlasTypeDefStoreInitializer {
return typesToUpdate;
}
@Override
public void instanceIsActive() throws AtlasException {
LOG.info("==> AtlasTypeDefStoreInitializer.instanceIsActive()");
try {
atlasTypeDefStore.init();
loadBootstrapTypeDefs();
} catch (AtlasBaseException e) {
LOG.error("Failed to init after becoming active", e);
}
LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsActive()");
}
@Override
public void instanceIsPassive() throws AtlasException {
LOG.info("==> AtlasTypeDefStoreInitializer.instanceIsPassive()");
LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsPassive()");
}
private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef) {
boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef);
......
......@@ -18,11 +18,9 @@
package org.apache.atlas.repository.store.graph;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.model.SearchFilter;
......@@ -52,7 +50,7 @@ import static org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreIniti
/**
* Abstract class for graph persistence store for TypeDef
*/
public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, ActiveStateChangeHandler {
public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore {
private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefGraphStore.class);
......@@ -79,6 +77,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
@Override
public void init() throws AtlasBaseException {
LOG.info("==> AtlasTypeDefGraphStore.init()");
AtlasTransientTypeRegistry ttr = null;
boolean commitUpdates = false;
......@@ -100,6 +100,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
commitUpdates = true;
} finally {
typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates);
LOG.info("<== AtlasTypeDefGraphStore.init()");
}
}
......@@ -651,20 +653,6 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
return getTypeDefFromType(type);
}
@Override
public void instanceIsActive() throws AtlasException {
try {
init();
} catch (AtlasBaseException e) {
LOG.error("Failed to init after becoming active", e);
}
}
@Override
public void instanceIsPassive() throws AtlasException {
LOG.info("Not reacting to a Passive state change");
}
private AtlasBaseTypeDef getTypeDefFromType(AtlasType type) throws AtlasBaseException {
AtlasBaseTypeDef ret;
switch (type.getTypeCategory()) {
......
......@@ -77,18 +77,6 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
super(typeRegistry, typeDefChangeListeners);
this.atlasGraph = atlasGraph;
LOG.debug("==> AtlasTypeDefGraphStoreV1()");
try {
init();
// commit/close the transaction after successful type store initialization.
atlasGraph.commit();
} catch (AtlasBaseException excp) {
atlasGraph.rollback();
LOG.error("failed to initialize types from graph store", excp);
}
LOG.debug("<== AtlasTypeDefGraphStoreV1()");
}
......@@ -121,11 +109,11 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
@Override
@PostConstruct
public void init() throws AtlasBaseException {
LOG.debug("==> AtlasTypeDefGraphStoreV1.init()");
LOG.info("==> AtlasTypeDefGraphStoreV1.init()");
super.init();
LOG.debug("<== AtlasTypeDefGraphStoreV1.init()");
LOG.info("<== AtlasTypeDefGraphStoreV1.init()");
}
AtlasGraph getAtlasGraph() { return atlasGraph; }
......
......@@ -32,7 +32,6 @@ import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
......@@ -59,11 +58,8 @@ public class GraphBackedSearchIndexerMockTest implements IAtlasGraphProvider {
public void testSearchIndicesAreInitializedOnConstructionWhenHAIsDisabled() throws IndexException, RepositoryException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(graph.getManagementSystem()).thenReturn(management);
when(management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(this, configuration, typeRegistry);
verify(management).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY);
}
@Test
......@@ -75,7 +71,6 @@ public class GraphBackedSearchIndexerMockTest implements IAtlasGraphProvider {
new GraphBackedSearchIndexer(this, configuration, typeRegistry);
verifyZeroInteractions(management);
}
@Test
......@@ -83,12 +78,9 @@ public class GraphBackedSearchIndexerMockTest implements IAtlasGraphProvider {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(graph.getManagementSystem()).thenReturn(management);
when(management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(this, configuration, typeRegistry);
graphBackedSearchIndexer.instanceIsActive();
verify(management).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment