Commit 96a11675 by Shwetha GS

ATLAS-1026 StoreBackedTypeCache issues (dkantor via shwethags)

parent 1c9b8b41
...@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES: ...@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ALL CHANGES: ALL CHANGES:
ATLAS-1026 StoreBackedTypeCache issues (dkantor via shwethags)
ATLAS-861 1 table out of 50,000 tables is left unimported throwing exception during deserialization (sumasai via shwethags) ATLAS-861 1 table out of 50,000 tables is left unimported throwing exception during deserialization (sumasai via shwethags)
ATLAS-1065 UI: Full text search view same as DSL's (kevalbhat18 via shwethags) ATLAS-1065 UI: Full text search view same as DSL's (kevalbhat18 via shwethags)
ATLAS-1066 Falcon fails to post entity to Atlas due to kafka exception (mneethiraj via shwethags) ATLAS-1066 Falcon fails to post entity to Atlas due to kafka exception (mneethiraj via shwethags)
......
...@@ -106,12 +106,6 @@ public class StoreBackedTypeCache extends DefaultTypeCache { ...@@ -106,12 +106,6 @@ public class StoreBackedTypeCache extends DefaultTypeCache {
} }
} }
@Override
public boolean has(String typeName) throws AtlasException {
return (get(typeName) != null);
}
/** /**
* Checks whether the specified type is cached in memory and does *not* * Checks whether the specified type is cached in memory and does *not*
* access the type store. Used for testing. * access the type store. Used for testing.
...@@ -124,21 +118,12 @@ public class StoreBackedTypeCache extends DefaultTypeCache { ...@@ -124,21 +118,12 @@ public class StoreBackedTypeCache extends DefaultTypeCache {
} }
/** /**
* Gets the requested type from the cache. * Check the type store for the requested type.
* This implementation will check the type store if the type is * If found in the type store, the type and any required super and attribute types
* not already cached. If found in the type store, the type and * are loaded from the type store, and added to the cache.
* any required super and attribute types are loaded from the type store, and
* added to the cache.
*
* @see org.apache.atlas.typesystem.types.cache.DefaultTypeCache#get(java.lang.String)
*/ */
@Override @Override
public IDataType get(String typeName) throws AtlasException { public IDataType onTypeFault(String typeName) throws AtlasException {
IDataType type = super.get(typeName);
if (type != null) {
return type;
}
// Type is not cached - check the type store. // Type is not cached - check the type store.
// Any super and attribute types needed by the requested type // Any super and attribute types needed by the requested type
......
...@@ -49,7 +49,6 @@ import java.util.Map; ...@@ -49,7 +49,6 @@ import java.util.Map;
* Unit test for {@link StoreBackedTypeCache} * Unit test for {@link StoreBackedTypeCache}
*/ */
@Guice(modules = RepositoryMetadataModule.class) @Guice(modules = RepositoryMetadataModule.class)
@Test(enabled = false)
public class StoreBackedTypeCacheTest { public class StoreBackedTypeCacheTest {
@Inject @Inject
...@@ -106,6 +105,7 @@ public class StoreBackedTypeCacheTest { ...@@ -106,6 +105,7 @@ public class StoreBackedTypeCacheTest {
ts.reset(); ts.reset();
} }
@Test
public void testGetClassType() throws Exception { public void testGetClassType() throws Exception {
for (Map.Entry<String, ClassType> typeEntry : classTypesToTest.entrySet()) { for (Map.Entry<String, ClassType> typeEntry : classTypesToTest.entrySet()) {
// Not cached yet // Not cached yet
...@@ -122,20 +122,7 @@ public class StoreBackedTypeCacheTest { ...@@ -122,20 +122,7 @@ public class StoreBackedTypeCacheTest {
} }
} }
public void testHasClassType() throws Exception { @Test
for (Map.Entry<String, ClassType> typeEntry : classTypesToTest.entrySet()) {
// Not cached yet
Assert.assertFalse(typeCache.isCachedInMemory(typeEntry.getKey()));
// Calling has() should result in type and its dependencies
// loaded from the type store and added to the cache.
Assert.assertTrue(typeCache.has(typeEntry.getKey()));
// Verify the type is now cached in memory.
Assert.assertTrue(typeCache.isCachedInMemory(typeEntry.getKey()));
}
}
public void testGetTraitType() throws Exception { public void testGetTraitType() throws Exception {
ImmutableList<String> traitNames = ts.getTypeNamesByCategory(TypeCategory.TRAIT); ImmutableList<String> traitNames = ts.getTypeNamesByCategory(TypeCategory.TRAIT);
for (String traitTypeName : traitNames) { for (String traitTypeName : traitNames) {
...@@ -153,21 +140,6 @@ public class StoreBackedTypeCacheTest { ...@@ -153,21 +140,6 @@ public class StoreBackedTypeCacheTest {
} }
} }
public void testHasTraitType() throws Exception {
ImmutableList<String> traitNames = ts.getTypeNamesByCategory(TypeCategory.TRAIT);
for (String traitTypeName : traitNames) {
// Not cached yet
Assert.assertFalse(typeCache.isCachedInMemory(traitTypeName));
// Calling has() should result in type and its dependencies
// loaded from the type store and added to the cache.
Assert.assertTrue(typeCache.has(traitTypeName));
// Verify the type is now cached.
Assert.assertTrue(typeCache.isCachedInMemory(traitTypeName));
}
}
private <T extends HierarchicalType> void verifyHierarchicalType(T dataType, T expectedDataType) throws AtlasException { private <T extends HierarchicalType> void verifyHierarchicalType(T dataType, T expectedDataType) throws AtlasException {
Assert.assertEquals(dataType.numFields, expectedDataType.numFields); Assert.assertEquals(dataType.numFields, expectedDataType.numFields);
Assert.assertEquals(dataType.immediateAttrs.size(), expectedDataType.immediateAttrs.size()); Assert.assertEquals(dataType.immediateAttrs.size(), expectedDataType.immediateAttrs.size());
......
...@@ -23,27 +23,37 @@ import org.apache.atlas.repository.typestore.ITypeStore; ...@@ -23,27 +23,37 @@ import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.repository.typestore.StoreBackedTypeCache; import org.apache.atlas.repository.typestore.StoreBackedTypeCache;
import org.apache.atlas.repository.typestore.StoreBackedTypeCacheTestModule; import org.apache.atlas.repository.typestore.StoreBackedTypeCacheTestModule;
import org.apache.atlas.services.MetadataService; import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUpdateException;
import org.apache.atlas.typesystem.types.cache.TypeCache; import org.apache.atlas.typesystem.types.cache.TypeCache;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup; import com.thinkaurelius.titan.core.util.TitanCleanup;
/** /**
* Verify MetadataService type lookup triggers StoreBackedTypeCache to load type from the store. * Verify MetadataService type operations trigger StoreBackedTypeCache to load non-cached types from the store.
* StoreBackedTypeCacheTestModule Guice module uses Atlas configuration * StoreBackedTypeCacheTestModule Guice module sets Atlas configuration
* which has type cache implementation class set to {@link StoreBackedTypeCache}. * to use {@link StoreBackedTypeCache} as the TypeCache implementation class.
*/ */
@Guice(modules = StoreBackedTypeCacheTestModule.class) @Guice(modules = StoreBackedTypeCacheTestModule.class)
@Test(enabled = false)
public class StoreBackedTypeCacheMetadataServiceTest public class StoreBackedTypeCacheMetadataServiceTest
{ {
@Inject @Inject
...@@ -55,13 +65,18 @@ public class StoreBackedTypeCacheMetadataServiceTest ...@@ -55,13 +65,18 @@ public class StoreBackedTypeCacheMetadataServiceTest
@Inject @Inject
TypeCache typeCache; TypeCache typeCache;
private StoreBackedTypeCache storeBackedTypeCache;
@Inject @Inject
private GraphProvider<TitanGraph> graphProvider; private GraphProvider<TitanGraph> graphProvider;
private TypeSystem ts; private TypeSystem ts;
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void oneTimeSetup() throws Exception {
Assert.assertTrue(typeCache instanceof StoreBackedTypeCache);
storeBackedTypeCache = (StoreBackedTypeCache) typeCache;
ts = TypeSystem.getInstance(); ts = TypeSystem.getInstance();
ts.reset(); ts.reset();
...@@ -70,6 +85,10 @@ public class StoreBackedTypeCacheMetadataServiceTest ...@@ -70,6 +85,10 @@ public class StoreBackedTypeCacheMetadataServiceTest
TestUtils.createHiveTypes(ts); TestUtils.createHiveTypes(ts);
ImmutableList<String> typeNames = ts.getTypeNames(); ImmutableList<String> typeNames = ts.getTypeNames();
typeStore.store(ts, typeNames); typeStore.store(ts, typeNames);
}
@BeforeMethod
public void setUp() throws Exception {
ts.reset(); ts.reset();
} }
...@@ -91,16 +110,51 @@ public class StoreBackedTypeCacheMetadataServiceTest ...@@ -91,16 +110,51 @@ public class StoreBackedTypeCacheMetadataServiceTest
} }
} }
public void testIt() throws Exception { @Test
Assert.assertTrue(typeCache instanceof StoreBackedTypeCache); public void testGetTypeDefinition() throws Exception {
StoreBackedTypeCache storeBackedCache = (StoreBackedTypeCache) typeCache;
// Cache should be empty // Cache should be empty
Assert.assertFalse(storeBackedCache.isCachedInMemory("Manager")); Assert.assertFalse(storeBackedTypeCache.isCachedInMemory("Manager"));
// Type lookup on MetadataService should cause Manager type to be loaded from the type store // Type lookup on MetadataService should cause Manager type to be loaded from the type store
// and cached. // and cached.
Assert.assertNotNull(metadataService.getTypeDefinition("Manager")); Assert.assertNotNull(metadataService.getTypeDefinition("Manager"));
Assert.assertTrue(storeBackedCache.isCachedInMemory("Manager")); Assert.assertTrue(storeBackedTypeCache.isCachedInMemory("Manager"));
}
@Test
public void testValidUpdateType() throws Exception {
// Cache should be empty
Assert.assertFalse(storeBackedTypeCache.isCachedInMemory(TestUtils.TABLE_TYPE));
TypesDef typesDef = TestUtils.defineHiveTypes();
String json = TypesSerialization.toJson(typesDef);
// Update types with same definition, which should succeed.
metadataService.updateType(json);
// hive_table type should now be cached.
Assert.assertTrue(storeBackedTypeCache.isCachedInMemory(TestUtils.TABLE_TYPE));
}
@Test
public void testInvalidUpdateType() throws Exception {
// Cache should be empty
Assert.assertFalse(storeBackedTypeCache.isCachedInMemory(TestUtils.TABLE_TYPE));
HierarchicalTypeDefinition<ClassType> classTypeDef = TypesUtil.createClassTypeDef(TestUtils.TABLE_TYPE, ImmutableSet.<String>of(),
new AttributeDefinition("attr1", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null));
String json = TypesSerialization.toJson(classTypeDef, false);
// Try to update the type with disallowed changes. Should fail with TypeUpdateException.
try {
metadataService.updateType(json);
Assert.fail(TypeUpdateException.class.getSimpleName() + " was expected but none thrown");
}
catch(TypeUpdateException e) {
// good
}
// hive_table type should now be cached.
Assert.assertTrue(storeBackedTypeCache.isCachedInMemory(TestUtils.TABLE_TYPE));
} }
} }
...@@ -148,7 +148,6 @@ public class TypeSystem { ...@@ -148,7 +148,6 @@ public class TypeSystem {
} }
public <T> T getDataType(Class<T> cls, String name) throws AtlasException { public <T> T getDataType(Class<T> cls, String name) throws AtlasException {
if (isCoreType(name)) { if (isCoreType(name)) {
return cls.cast(coreTypes.get(name)); return cls.cast(coreTypes.get(name));
} }
...@@ -180,6 +179,14 @@ public class TypeSystem { ...@@ -180,6 +179,14 @@ public class TypeSystem {
return cls.cast(dT); return cls.cast(dT);
} }
/*
* Invoke cache callback to possibly obtain type from other storage.
*/
IDataType dT = typeCache.onTypeFault(name);
if (dT != null) {
return cls.cast(dT);
}
throw new TypeNotFoundException(String.format("Unknown datatype: %s", name)); throw new TypeNotFoundException(String.format("Unknown datatype: %s", name));
} }
...@@ -599,8 +606,13 @@ public class TypeSystem { ...@@ -599,8 +606,13 @@ public class TypeSystem {
private void validateUpdateIsPossible() throws TypeUpdateException, AtlasException { private void validateUpdateIsPossible() throws TypeUpdateException, AtlasException {
//If the type is modified, validate that update can be done //If the type is modified, validate that update can be done
for (IDataType newType : transientTypes.values()) { for (IDataType newType : transientTypes.values()) {
if (TypeSystem.this.isRegistered(newType.getName())) { IDataType oldType = null;
IDataType oldType = TypeSystem.this.typeCache.get(newType.getName()); try {
oldType = TypeSystem.this.getDataType(IDataType.class, newType.getName());
} catch (TypeNotFoundException e) {
LOG.debug("No existing type %s found - update OK", newType.getName());
}
if (oldType != null) {
oldType.validateUpdate(newType); oldType.validateUpdate(newType);
} }
} }
......
...@@ -288,4 +288,10 @@ public class DefaultTypeCache implements TypeCache { ...@@ -288,4 +288,10 @@ public class DefaultTypeCache implements TypeCache {
types_.clear(); types_.clear();
} }
@Override
public IDataType onTypeFault(String typeName) throws AtlasException {
return null;
}
} }
...@@ -21,6 +21,7 @@ package org.apache.atlas.typesystem.types.cache; ...@@ -21,6 +21,7 @@ package org.apache.atlas.typesystem.types.cache;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.types.DataTypes; import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.TypeSystem;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
...@@ -137,4 +138,16 @@ public interface TypeCache { ...@@ -137,4 +138,16 @@ public interface TypeCache {
* *
*/ */
void clear(); void clear();
/**
* Called when a type lookup request on {@link TypeSystem}
* fails because the type is not present in the runtime type information.
* Implementations can take action such as retrieving the requested type
* from some persistent storage.
* @param typeName
* @throws AtlasException
*/
IDataType onTypeFault(String typeName) throws AtlasException;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment