Commit 2c0dc406 by Shwetha GS

ATLAS-690 Read timed out exceptions when tables are imported into Atlas (yhemanth via shwethags)

parent 1e3029bc
...@@ -18,6 +18,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -18,6 +18,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-690 Read timed out exceptions when tables are imported into Atlas (yhemanth via shwethags)
ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags) ATLAS-585 NotificationHookConsumer creates new AtlasClient for every message (shwethags)
ATLAS-682 Set HBase root dir to be relative to test target directory for HBaseBasedAuditRepositoryTest (shwethags via yhemanth) ATLAS-682 Set HBase root dir to be relative to test target directory for HBaseBasedAuditRepositoryTest (shwethags via yhemanth)
ATLAS-742 Avoid downloading hbase multiple times (shwethags via yhemanth) ATLAS-742 Avoid downloading hbase multiple times (shwethags via yhemanth)
......
...@@ -27,26 +27,41 @@ import org.apache.atlas.typesystem.types.DataTypes; ...@@ -27,26 +27,41 @@ import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.EnumValue; import org.apache.atlas.typesystem.types.EnumValue;
import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.IDataType;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class FullTextMapper { public class FullTextMapper {
private static final Logger LOG = LoggerFactory.getLogger(FullTextMapper.class);
private final GraphToTypedInstanceMapper graphToTypedInstanceMapper; private final GraphToTypedInstanceMapper graphToTypedInstanceMapper;
private static final GraphHelper graphHelper = GraphHelper.getInstance(); private static final GraphHelper graphHelper = GraphHelper.getInstance();
private static final String FULL_TEXT_DELIMITER = " "; private static final String FULL_TEXT_DELIMITER = " ";
private final Map<String, ITypedReferenceableInstance> instanceCache;
FullTextMapper(GraphToTypedInstanceMapper graphToTypedInstanceMapper) { FullTextMapper(GraphToTypedInstanceMapper graphToTypedInstanceMapper) {
this.graphToTypedInstanceMapper = graphToTypedInstanceMapper; this.graphToTypedInstanceMapper = graphToTypedInstanceMapper;
instanceCache = new HashMap<>();
} }
public String mapRecursive(Vertex instanceVertex, boolean followReferences) throws AtlasException { public String mapRecursive(Vertex instanceVertex, boolean followReferences) throws AtlasException {
String guid = instanceVertex.getProperty(Constants.GUID_PROPERTY_KEY); String guid = instanceVertex.getProperty(Constants.GUID_PROPERTY_KEY);
ITypedReferenceableInstance typedReference = ITypedReferenceableInstance typedReference;
graphToTypedInstanceMapper.mapGraphToTypedInstance(guid, instanceVertex); if (instanceCache.containsKey(guid)) {
typedReference = instanceCache.get(guid);
LOG.debug("Cache hit: guid = {}, entityId = {}", guid, typedReference.getId()._getId());
} else {
typedReference =
graphToTypedInstanceMapper.mapGraphToTypedInstance(guid, instanceVertex);
instanceCache.put(guid, typedReference);
LOG.debug("Cache miss: guid = {}, entityId = {}", guid, typedReference.getId().getId());
}
String fullText = forInstance(typedReference, followReferences); String fullText = forInstance(typedReference, followReferences);
StringBuilder fullTextBuilder = StringBuilder fullTextBuilder =
new StringBuilder(typedReference.getTypeName()).append(FULL_TEXT_DELIMITER).append(fullText); new StringBuilder(typedReference.getTypeName()).append(FULL_TEXT_DELIMITER).append(fullText);
......
...@@ -390,12 +390,20 @@ public final class GraphHelper { ...@@ -390,12 +390,20 @@ public final class GraphHelper {
} }
public static String string(Vertex vertex) { public static String string(Vertex vertex) {
return String.format("vertex[id=%s type=%s guid=%s]", vertex.getId().toString(), getTypeName(vertex), if (LOG.isDebugEnabled()) {
getIdFromVertex(vertex)); return String.format("vertex[id=%s type=%s guid=%s]", vertex.getId().toString(), getTypeName(vertex),
getIdFromVertex(vertex));
} else {
return String.format("vertex[id=%s]", vertex.getId().toString());
}
} }
public static String string(Edge edge) { public static String string(Edge edge) {
return String.format("edge[id=%s label=%s from %s -> to %s]", edge.getId().toString(), edge.getLabel(), if (LOG.isDebugEnabled()) {
string(edge.getVertex(Direction.OUT)), string(edge.getVertex(Direction.IN))); return String.format("edge[id=%s label=%s from %s -> to %s]", edge.getId().toString(), edge.getLabel(),
string(edge.getVertex(Direction.OUT)), string(edge.getVertex(Direction.IN)));
} else {
return String.format("edge[id=%s]", edge.getId().toString());
}
} }
} }
\ No newline at end of file
...@@ -96,11 +96,11 @@ public final class TypedInstanceToGraphMapper { ...@@ -96,11 +96,11 @@ public final class TypedInstanceToGraphMapper {
createVerticesAndDiscoverInstances(newInstances); createVerticesAndDiscoverInstances(newInstances);
List<ITypedReferenceableInstance> entitiesToCreate = instancesPair.left; List<ITypedReferenceableInstance> entitiesToCreate = instancesPair.left;
List<ITypedReferenceableInstance> entitiesToUpdate = instancesPair.right; List<ITypedReferenceableInstance> entitiesToUpdate = instancesPair.right;
FullTextMapper fulltextMapper = new FullTextMapper(graphToTypedInstanceMapper);
switch (operation) { switch (operation) {
case CREATE: case CREATE:
List<String> ids = addOrUpdateAttributesAndTraits(operation, entitiesToCreate); List<String> ids = addOrUpdateAttributesAndTraits(operation, entitiesToCreate);
addFullTextProperty(entitiesToCreate); addFullTextProperty(entitiesToCreate, fulltextMapper);
requestContext.recordCreatedEntities(ids); requestContext.recordCreatedEntities(ids);
break; break;
...@@ -111,8 +111,8 @@ public final class TypedInstanceToGraphMapper { ...@@ -111,8 +111,8 @@ public final class TypedInstanceToGraphMapper {
ids = addOrUpdateAttributesAndTraits(operation, entitiesToUpdate); ids = addOrUpdateAttributesAndTraits(operation, entitiesToUpdate);
requestContext.recordUpdatedEntities(ids); requestContext.recordUpdatedEntities(ids);
addFullTextProperty(entitiesToCreate); addFullTextProperty(entitiesToCreate, fulltextMapper);
addFullTextProperty(entitiesToUpdate); addFullTextProperty(entitiesToUpdate, fulltextMapper);
break; break;
default: default:
...@@ -289,8 +289,7 @@ public final class TypedInstanceToGraphMapper { ...@@ -289,8 +289,7 @@ public final class TypedInstanceToGraphMapper {
return TypeUtils.Pair.of(instancesToCreate, instancesToUpdate); return TypeUtils.Pair.of(instancesToCreate, instancesToUpdate);
} }
private void addFullTextProperty(List<ITypedReferenceableInstance> instances) throws AtlasException { private void addFullTextProperty(List<ITypedReferenceableInstance> instances, FullTextMapper fulltextMapper) throws AtlasException {
FullTextMapper fulltextMapper = new FullTextMapper(graphToTypedInstanceMapper);
for (ITypedReferenceableInstance typedInstance : instances) { // Traverse for (ITypedReferenceableInstance typedInstance : instances) { // Traverse
Vertex instanceVertex = getClassVertex(typedInstance); Vertex instanceVertex = getClassVertex(typedInstance);
String fullText = fulltextMapper.mapRecursive(instanceVertex, true); String fullText = fulltextMapper.mapRecursive(instanceVertex, true);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graph;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.RepositoryException;
import org.apache.commons.configuration.Configuration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class GraphBackedSearchIndexerMockTest {
@Mock
private Configuration configuration;
@Mock
private GraphProvider<TitanGraph> graphProvider;
@Mock
private TitanGraph titanGraph;
@Mock
private TitanManagement titanManagement;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testSearchIndicesAreInitializedOnConstructionWhenHAIsDisabled() throws IndexException, RepositoryException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(graphProvider.get()).thenReturn(titanGraph);
when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration);
verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY);
}
@Test
public void testSearchIndicesAreNotInitializedOnConstructionWhenHAIsEnabled() throws IndexException, RepositoryException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(graphProvider.get()).thenReturn(titanGraph);
when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration);
verifyZeroInteractions(titanManagement);
}
@Test
public void testIndicesAreReinitializedWhenServerBecomesActive() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(graphProvider.get()).thenReturn(titanGraph);
when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true);
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration);
graphBackedSearchIndexer.instanceIsActive();
verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY);
}
}
...@@ -18,77 +18,140 @@ ...@@ -18,77 +18,140 @@
package org.apache.atlas.repository.graph; package org.apache.atlas.repository.graph;
import com.google.inject.Inject;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
import com.thinkaurelius.titan.core.schema.TitanManagement; import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexException; import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.commons.configuration.Configuration; import org.apache.atlas.typesystem.types.EnumType;
import org.mockito.Mock; import org.apache.atlas.typesystem.types.EnumValue;
import org.mockito.MockitoAnnotations; import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.testng.annotations.BeforeMethod; import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import static org.mockito.Mockito.verify; import java.util.Arrays;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class GraphBackedSearchIndexerTest { import static junit.framework.Assert.assertTrue;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@Mock @Guice(modules = RepositoryMetadataModule.class)
private Configuration configuration; public class GraphBackedSearchIndexerTest {
@Mock @Inject
private GraphProvider<TitanGraph> graphProvider; private GraphProvider<TitanGraph> graphProvider;
@Mock @Inject
private TitanGraph titanGraph; private GraphBackedSearchIndexer graphBackedSearchIndexer;
@Mock
private TitanManagement titanManagement;
@BeforeMethod @Test
public void setup() { public void verifySystemMixedIndexes() {
MockitoAnnotations.initMocks(this); TitanGraph titanGraph = graphProvider.get();
TitanManagement managementSystem = titanGraph.getManagementSystem();
TitanGraphIndex vertexIndex = managementSystem.getGraphIndex(Constants.VERTEX_INDEX);
assertNotNull(vertexIndex);
assertTrue(vertexIndex.isMixedIndex());
assertTrue(Vertex.class.isAssignableFrom(vertexIndex.getIndexedElement()));
TitanGraphIndex edgeIndex = managementSystem.getGraphIndex(Constants.EDGE_INDEX);
assertNotNull(edgeIndex);
assertTrue(edgeIndex.isMixedIndex());
assertTrue(Edge.class.isAssignableFrom(edgeIndex.getIndexedElement()));
} }
@Test @Test
public void testSearchIndicesAreInitializedOnConstructionWhenHAIsDisabled() throws IndexException, RepositoryException { public void verifySystemCompositeIndexes() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); TitanGraph titanGraph = graphProvider.get();
when(graphProvider.get()).thenReturn(titanGraph); TitanManagement managementSystem = titanGraph.getManagementSystem();
when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); verifySystemCompositeIndex(managementSystem, Constants.GUID_PROPERTY_KEY, true);
verifyVertexIndexContains(managementSystem, Constants.GUID_PROPERTY_KEY);
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration); verifySystemCompositeIndex(managementSystem, Constants.ENTITY_TYPE_PROPERTY_KEY, false);
verifyVertexIndexContains(managementSystem, Constants.ENTITY_TYPE_PROPERTY_KEY);
verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY); verifySystemCompositeIndex(managementSystem, Constants.SUPER_TYPES_PROPERTY_KEY, false);
verifyVertexIndexContains(managementSystem, Constants.SUPER_TYPES_PROPERTY_KEY);
verifySystemCompositeIndex(managementSystem, Constants.TRAIT_NAMES_PROPERTY_KEY, false);
verifyVertexIndexContains(managementSystem, Constants.TRAIT_NAMES_PROPERTY_KEY);
} }
@Test @Test
public void testSearchIndicesAreNotInitializedOnConstructionWhenHAIsEnabled() throws IndexException, RepositoryException { public void verifyFullTextIndex() {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); TitanGraph titanGraph = graphProvider.get();
when(graphProvider.get()).thenReturn(titanGraph); TitanManagement managementSystem = titanGraph.getManagementSystem();
when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); TitanGraphIndex fullTextIndex = managementSystem.getGraphIndex(Constants.FULLTEXT_INDEX);
assertTrue(fullTextIndex.isMixedIndex());
Arrays.asList(fullTextIndex.getFieldKeys()).contains(
managementSystem.getPropertyKey(Constants.ENTITY_TEXT_PROPERTY_KEY));
}
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration); @Test
public void verifyTypeStoreIndexes() {
TitanGraph titanGraph = graphProvider.get();
TitanManagement managementSystem = titanGraph.getManagementSystem();
verifyZeroInteractions(titanManagement); verifySystemCompositeIndex(managementSystem, Constants.TYPENAME_PROPERTY_KEY, true);
verifyVertexIndexContains(managementSystem, Constants.TYPENAME_PROPERTY_KEY);
verifySystemCompositeIndex(managementSystem, Constants.VERTEX_TYPE_PROPERTY_KEY, false);
verifyVertexIndexContains(managementSystem, Constants.VERTEX_TYPE_PROPERTY_KEY);
} }
@Test @Test
public void testIndicesAreReinitializedWhenServerBecomesActive() throws AtlasException { public void verifyUserDefinedTypeIndex() throws AtlasException {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); TitanGraph titanGraph = graphProvider.get();
when(graphProvider.get()).thenReturn(titanGraph); TitanManagement managementSystem = titanGraph.getManagementSystem();
when(titanGraph.getManagementSystem()).thenReturn(titanManagement);
when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); TypeSystem typeSystem = TypeSystem.getInstance();
String enumName = "randomEnum" + RandomStringUtils.randomAlphanumeric(10);
EnumType managedType = typeSystem.defineEnumType(enumName, new EnumValue("randomEnumValue", 0));
HierarchicalTypeDefinition<ClassType> databaseTypeDefinition =
createClassTypeDef("Database", "Database type description", null,
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createUniqueRequiredAttrDef("managedType", managedType));
ClassType databaseType = typeSystem.defineClassType(databaseTypeDefinition);
graphBackedSearchIndexer.onAdd(Arrays.asList(databaseType));
GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration); verifySystemCompositeIndex(managementSystem, "Database.name", false);
graphBackedSearchIndexer.instanceIsActive(); verifyVertexIndexContains(managementSystem, "Database.name");
verifySystemCompositeIndex(managementSystem, "Database.managedType", false);
verifyVertexIndexContains(managementSystem, "Database.managedType");
}
private void verifyVertexIndexContains(TitanManagement managementSystem, String indexName) {
TitanGraphIndex vertexIndex = managementSystem.getGraphIndex(Constants.VERTEX_INDEX);
PropertyKey[] fieldKeys = vertexIndex.getFieldKeys();
Arrays.asList(fieldKeys).contains(managementSystem.getPropertyKey(indexName));
}
verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY); private void verifySystemCompositeIndex(TitanManagement managementSystem, String indexName, boolean isUnique) {
TitanGraphIndex guidIndex = managementSystem.getGraphIndex(indexName);
assertNotNull(guidIndex);
assertTrue(guidIndex.isCompositeIndex());
if (isUnique) {
assertTrue(guidIndex.isUnique());
} else {
assertFalse(guidIndex.isUnique());
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment