Commit 4b8b9e22 by Madhan Neethiraj

ATLAS-1472: updated type-registry to handle simultaneous updates from multiple threads

parent 57f4f79d
......@@ -76,6 +76,7 @@ public enum AtlasErrorCode {
INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"),
INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"),
INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"),
FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"),
INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND(400, "ATLAS40018E", "Instance {0} with unique attribute {1} does not exist"),
......
......@@ -38,6 +38,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX;
......@@ -51,15 +53,20 @@ import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUF
@Singleton
public class AtlasTypeRegistry {
private static final Logger LOG = LoggerFactory.getLogger(AtlasStructType.class);
private static final int DEFAULT_LOCK_MAX_WAIT_TIME_IN_SECONDS = 15;
protected RegistryData registryData;
private final TypeRegistryUpdateSynchronizer updateSynchronizer;
public AtlasTypeRegistry() {
registryData = new RegistryData();
updateSynchronizer = new TypeRegistryUpdateSynchronizer(this);
}
// used only by AtlasTransientTypeRegistry
protected AtlasTypeRegistry(AtlasTypeRegistry other) {
registryData = new RegistryData(other.registryData);
updateSynchronizer = other.updateSynchronizer;
}
public Collection<String> getAllTypeNames() { return registryData.allTypes.getAllTypeNames(); }
......@@ -195,14 +202,19 @@ public class AtlasTypeRegistry {
public AtlasEntityType getEntityTypeByName(String name) { return registryData.entityDefs.getTypeByName(name); }
public AtlasTransientTypeRegistry createTransientTypeRegistry() {
return new AtlasTransientTypeRegistry(this);
public AtlasTransientTypeRegistry lockTypeRegistryForUpdate() throws AtlasBaseException {
return lockTypeRegistryForUpdate(DEFAULT_LOCK_MAX_WAIT_TIME_IN_SECONDS);
}
public void commitTransientTypeRegistry(AtlasTransientTypeRegistry transientTypeRegistry) {
this.registryData = transientTypeRegistry.registryData;
public AtlasTransientTypeRegistry lockTypeRegistryForUpdate(int lockMaxWaitTimeInSeconds) throws AtlasBaseException {
return updateSynchronizer.lockTypeRegistryForUpdate(lockMaxWaitTimeInSeconds);
}
public void releaseTypeRegistryForUpdate(AtlasTransientTypeRegistry transientTypeRegistry, boolean commitUpdates) {
updateSynchronizer.releaseTypeRegistryForUpdate(transientTypeRegistry, commitUpdates);
}
static class RegistryData {
final TypeCache allTypes;
final TypeDefCache<AtlasEnumDef, AtlasEnumType> enumDefs;
......@@ -519,12 +531,16 @@ public class AtlasTypeRegistry {
public List<AtlasBaseTypeDef> getDeleteedTypes() { return deletedTypes; }
private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) {
private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) throws AtlasBaseException{
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeRegistry.addTypeWithNoRefResolve({})", typeDef);
}
if (typeDef != null) {
if (this.isRegisteredType(typeDef.getName())) {
throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, typeDef.getName());
}
if (typeDef.getClass().equals(AtlasEnumDef.class)) {
AtlasEnumDef enumDef = (AtlasEnumDef) typeDef;
......@@ -552,7 +568,7 @@ public class AtlasTypeRegistry {
}
}
private void addTypesWithNoRefResolve(Collection<? extends AtlasBaseTypeDef> typeDefs) {
private void addTypesWithNoRefResolve(Collection<? extends AtlasBaseTypeDef> typeDefs) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeRegistry.addTypesWithNoRefResolve(length={})",
(typeDefs == null ? 0 : typeDefs.size()));
......@@ -681,6 +697,89 @@ public class AtlasTypeRegistry {
}
}
}
static class TypeRegistryUpdateSynchronizer {
private final AtlasTypeRegistry typeRegistry;
private final ReentrantLock typeRegistryUpdateLock;
private AtlasTransientTypeRegistry typeRegistryUnderUpdate = null;
private String lockedByThread = null;
TypeRegistryUpdateSynchronizer(AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
this.typeRegistryUpdateLock = new ReentrantLock();
}
AtlasTransientTypeRegistry lockTypeRegistryForUpdate(int lockMaxWaitTimeInSeconds) throws AtlasBaseException {
LOG.debug("==> lockTypeRegistryForUpdate()");
boolean alreadyLockedByCurrentThread = typeRegistryUpdateLock.isHeldByCurrentThread();
if (!alreadyLockedByCurrentThread) {
if (LOG.isDebugEnabled()) {
LOG.debug("lockTypeRegistryForUpdate(): waiting for lock to be released by thread {}", lockedByThread);
}
} else {
LOG.warn("lockTypeRegistryForUpdate(): already locked. currentLockCount={}",
typeRegistryUpdateLock.getHoldCount());
}
try {
boolean isLocked = typeRegistryUpdateLock.tryLock(lockMaxWaitTimeInSeconds, TimeUnit.SECONDS);
if (!isLocked) {
throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK);
}
} catch (InterruptedException excp) {
throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK, excp);
}
if (!alreadyLockedByCurrentThread) {
if (LOG.isDebugEnabled()) {
LOG.debug("lockTypeRegistryForUpdate(): wait over..got the lock");
}
typeRegistryUnderUpdate = new AtlasTransientTypeRegistry(typeRegistry);
lockedByThread = Thread.currentThread().getName();
}
LOG.debug("<== lockTypeRegistryForUpdate()");
return typeRegistryUnderUpdate;
}
void releaseTypeRegistryForUpdate(AtlasTransientTypeRegistry ttr, boolean commitUpdates) {
LOG.debug("==> releaseTypeRegistryForUpdate()");
if (typeRegistryUpdateLock.isHeldByCurrentThread()) {
try {
if (typeRegistryUnderUpdate != ttr) {
LOG.error("releaseTypeRegistryForUpdate(): incorrect typeRegistry returned for release" +
": found=" + ttr + "; expected=" + typeRegistryUnderUpdate,
new Exception().fillInStackTrace());
} else if (typeRegistryUpdateLock.getHoldCount() == 1) {
if (ttr != null && commitUpdates) {
typeRegistry.registryData = ttr.registryData;
}
}
if (typeRegistryUpdateLock.getHoldCount() == 1) {
lockedByThread = null;
typeRegistryUnderUpdate = null;
} else {
LOG.warn("releaseTypeRegistryForUpdate(): pendingReleaseCount={}", typeRegistryUpdateLock.getHoldCount() - 1);
}
} finally {
typeRegistryUpdateLock.unlock();
}
} else {
LOG.error("releaseTypeRegistryForUpdate(): current thread does not hold the lock",
new Exception().fillInStackTrace());
}
LOG.debug("<== releaseTypeRegistryForUpdate()");
}
}
}
class TypeCache {
......
......@@ -17,7 +17,6 @@
*/
package org.apache.atlas.type;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasErrorCode;
......@@ -36,7 +35,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
......@@ -52,8 +50,6 @@ import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_S
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_KEY_VAL_SEP;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_PREFIX;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUFFIX;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_REF_ATTRIBUTE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_MAPPED_FROM_REF;
/**
* Utility methods for AtlasType/AtlasTypeDef.
......@@ -65,8 +61,8 @@ public class AtlasTypeUtil {
private static final Pattern NAME_PATTERN = Pattern.compile(NAME_REGEX);
private static final Pattern TRAIT_NAME_PATTERN = Pattern.compile(TRAIT_NAME_REGEX);
private static final String InvalidTypeNameErrorMessage = "Names must consist of a letter followed by a sequence of letter, number, or '_' characters.";
private static final String InvalidTraitTypeNameErrorMessage = "Names must consist of a leter followed by a sequence of letters, numbers, '.', or '_' characters.";
private static final String InvalidTypeNameErrorMessage = "Name must consist of a letter followed by a sequence of [ letter, number, '_' ] characters.";
private static final String InvalidTraitTypeNameErrorMessage = "Name must consist of a letter followed by a sequence of [ letter, number, '_', '.' ] characters.";
static {
Collections.addAll(ATLAS_BUILTIN_TYPENAMES, AtlasBaseTypeDef.ATLAS_BUILTIN_TYPES);
......
......@@ -158,16 +158,21 @@ public final class ModelTestUtil {
ret.setDefaultValue(ret.getElementDefs().get(idxDefault).getValue());
}
AtlasTransientTypeRegistry ttr = null;
boolean commit = false;
try {
AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
ttr = typesRegistry.lockTypeRegistryForUpdate();
ttr.addType(ret);
typesRegistry.commitTransientTypeRegistry(ttr);
commit = true;
} catch (AtlasBaseException excp) {
LOG.error("failed to create enum-def", excp);
ret = null;
} finally {
typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
return ret;
......@@ -186,16 +191,21 @@ public final class ModelTestUtil {
ret.setDescription(ret.getName());
ret.setAttributeDefs(newAttributeDefsWithAllBuiltInTypes(PREFIX_ATTRIBUTE_NAME));
AtlasTransientTypeRegistry ttr = null;
boolean commit = false;
try {
AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
ttr = typesRegistry.lockTypeRegistryForUpdate();
ttr.addType(ret);
typesRegistry.commitTransientTypeRegistry(ttr);
commit = true;
} catch (AtlasBaseException excp) {
LOG.error("failed to create struct-def", excp);
ret = null;
} finally {
typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
return ret;
......@@ -228,16 +238,21 @@ public final class ModelTestUtil {
}
}
AtlasTransientTypeRegistry ttr = null;
boolean commit = false;
try {
AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
ttr = typesRegistry.lockTypeRegistryForUpdate();
ttr.addType(ret);
typesRegistry.commitTransientTypeRegistry(ttr);
commit = true;
} catch (AtlasBaseException excp) {
LOG.error("failed to create entity-def", excp);
ret = null;
} finally {
typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
return ret;
......@@ -279,16 +294,21 @@ public final class ModelTestUtil {
}
}
AtlasTransientTypeRegistry ttr = null;
boolean commit = false;
try {
AtlasTransientTypeRegistry ttr = typesRegistry.createTransientTypeRegistry();
ttr = typesRegistry.lockTypeRegistryForUpdate();
ttr.addType(ret);
typesRegistry.commitTransientTypeRegistry(ttr);
commit = true;
} catch (AtlasBaseException excp) {
LOG.error("failed to create classification-def", excp);
ret = null;
} finally {
typesRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
return ret;
......
......@@ -125,6 +125,8 @@ public class TestAtlasEntityType {
@Test
public void testForeignKeyConstraintValid() {
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
AtlasTransientTypeRegistry ttr = null;
boolean commit = false;
List<AtlasEntityDef> entityDefs = new ArrayList<>();
String failureMsg = null;
......@@ -132,13 +134,15 @@ public class TestAtlasEntityType {
entityDefs.add(createColumnEntityDef());
try {
AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
ttr = typeRegistry.lockTypeRegistryForUpdate();
ttr.addTypes(entityDefs);
typeRegistry.commitTransientTypeRegistry(ttr);
commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
} finally {
typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNull(failureMsg, "failed to create types my_table and my_column");
}
......@@ -151,14 +155,19 @@ public class TestAtlasEntityType {
entityDefs.add(createTableEntityDef());
AtlasTransientTypeRegistry ttr = null;
boolean commit = false;
try {
AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
ttr = typeRegistry.lockTypeRegistryForUpdate();
ttr.addTypes(entityDefs);
typeRegistry.commitTransientTypeRegistry(ttr);
commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
} finally {
typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid constraint failure - unknown attribute in mappedFromRef");
}
......@@ -166,6 +175,8 @@ public class TestAtlasEntityType {
@Test
public void testForeignKeyConstraintInValidMappedFromRef2() {
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
AtlasTransientTypeRegistry ttr = null;
boolean commit = false;
List<AtlasEntityDef> entityDefs = new ArrayList<>();
String failureMsg = null;
......@@ -173,13 +184,15 @@ public class TestAtlasEntityType {
entityDefs.add(createColumnEntityDef());
try {
AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
ttr = typeRegistry.lockTypeRegistryForUpdate();
ttr.addTypes(entityDefs);
typeRegistry.commitTransientTypeRegistry(ttr);
commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
} finally {
typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid constraint failure - missing refAttribute in mappedFromRef");
}
......@@ -187,19 +200,23 @@ public class TestAtlasEntityType {
@Test
public void testForeignKeyConstraintInValidForeignKey() {
AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
AtlasTransientTypeRegistry ttr = null;
boolean commit = false;
List<AtlasEntityDef> entityDefs = new ArrayList<>();
String failureMsg = null;
entityDefs.add(createColumnEntityDef());
try {
AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry();
ttr = typeRegistry.lockTypeRegistryForUpdate();
ttr.addTypes(entityDefs);
typeRegistry.commitTransientTypeRegistry(ttr);
commit = true;
} catch (AtlasBaseException excp) {
failureMsg = excp.getMessage();
} finally {
typeRegistry.releaseTypeRegistryForUpdate(ttr, commit);
}
assertNotNull(failureMsg, "expected invalid constraint failure - unknown attribute in foreignKey");
}
......
......@@ -68,7 +68,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
Set<TypeDefChangeListener> typeDefChangeListeners) {
super(typeRegistry, typeDefChangeListeners);
LOG.info("==> AtlasTypeDefGraphStoreV1()");
LOG.debug("==> AtlasTypeDefGraphStoreV1()");
try {
init();
......@@ -76,7 +76,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
LOG.error("failed to initialize types from graph store", excp);
}
LOG.info("<== AtlasTypeDefGraphStoreV1()");
LOG.debug("<== AtlasTypeDefGraphStoreV1()");
}
@Override
......@@ -101,11 +101,11 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
@Override
public void init() throws AtlasBaseException {
LOG.info("==> AtlasTypeDefGraphStoreV1.init()");
LOG.debug("==> AtlasTypeDefGraphStoreV1.init()");
super.init();
LOG.info("<== AtlasTypeDefGraphStoreV1.init()");
LOG.debug("<== AtlasTypeDefGraphStoreV1.init()");
}
AtlasGraph getAtlasGraph() { return atlasGraph; }
......
......@@ -48,6 +48,10 @@ public class AtlasRepositoryConfiguration {
private static List<String> skippedOperations = null;
public static final String SEPARATOR = ":";
private static final String CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = "atlas.server.type.update.lock.max.wait.time.seconds";
private static final Integer DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS = Integer.valueOf(15);
private static Integer typeUpdateLockMaxWaitTimeInSeconds = null;
@SuppressWarnings("unchecked")
public static Class<? extends TypeCache> getTypeCache() {
// Get the type cache implementation class from Atlas configuration.
......@@ -155,4 +159,21 @@ public class AtlasRepositoryConfiguration {
skippedOperations = null;
}
public static int getTypeUpdateLockMaxWaitTimeInSeconds() {
Integer ret = typeUpdateLockMaxWaitTimeInSeconds;
if (ret == null) {
try {
Configuration config = ApplicationProperties.get();
ret = config.getInteger(CONFIG_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS, DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS);
typeUpdateLockMaxWaitTimeInSeconds = ret;
} catch (AtlasException e) {
// ignore
}
}
return ret == null ? DEFAULT_TYPE_UPDATE_LOCK_MAX_WAIT_TIME_IN_SECONDS : ret;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment