Commit d5a5238b by Sarath Subramanian Committed by Madhan Neethiraj

ATLAS-1687: hbase_column_family and hbase_column are not loaded after upgrade

Change-Id: Ica0339cbcbb000b1785fed92ef82f09c41af32a8 Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org>
parent a9f3da62
......@@ -8,7 +8,7 @@
"superTypes": [
"DataSet"
],
"typeVersion": "1.0",
"typeVersion": "1.1",
"attributeDefs": [
{
"name": "uri",
......
......@@ -83,6 +83,8 @@ public interface AtlasTypeDefStore {
AtlasTypesDef updateTypesDef(AtlasTypesDef atlasTypesDef) throws AtlasBaseException;
AtlasTypesDef createUpdateTypesDef(AtlasTypesDef typesToCreate, AtlasTypesDef typesToUpdate) throws AtlasBaseException;
void deleteTypesDef(AtlasTypesDef atlasTypesDef) throws AtlasBaseException;
AtlasTypesDef searchTypesDef(SearchFilter searchFilter) throws AtlasBaseException;
......
......@@ -485,16 +485,30 @@ public class AtlasTypeRegistry {
}
if (typesDef != null) {
updateTypesWithNoRefResolve(typesDef);
resolveReferences();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeRegistry.updateTypes({})", typesDef);
}
}
public void updateTypesWithNoRefResolve(AtlasTypesDef typesDef) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeRegistry.updateTypesWithNoRefResolve({})", typesDef);
}
if (typesDef != null) {
updateTypesWithNoRefResolve(typesDef.getEnumDefs());
updateTypesWithNoRefResolve(typesDef.getStructDefs());
updateTypesWithNoRefResolve(typesDef.getClassificationDefs());
updateTypesWithNoRefResolve(typesDef.getEntityDefs());
resolveReferences();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeRegistry.updateTypes({})", typesDef);
LOG.debug("<== AtlasTypeRegistry.updateTypesWithNoRefResolve({})", typesDef);
}
}
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
......@@ -31,6 +32,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.ObjectUtils;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
......@@ -87,16 +89,16 @@ public class AtlasTypeDefStoreInitializer {
}
AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, typeRegistry);
AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, typeRegistry);
if (typesToCreate.isEmpty()) {
LOG.info("No new type in file {}", typeDefFile.getAbsolutePath());
if (!typesToCreate.isEmpty() || !typesToUpdate.isEmpty()) {
typeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate);
continue;
LOG.info("Created/Updated types defined in file {}", typeDefFile.getAbsolutePath());
} else {
LOG.info("No new type in file {}", typeDefFile.getAbsolutePath());
}
LOG.info("Loading types defined in file {}", typeDefFile.getAbsolutePath());
typeDefStore.createTypesDef(typesToCreate);
} catch (Throwable t) {
LOG.error("error while registering types in file {}", typeDefFile.getAbsolutePath(), t);
}
......@@ -143,6 +145,100 @@ public class AtlasTypeDefStoreInitializer {
return typesToCreate;
}
public static AtlasTypesDef getTypesToUpdate(AtlasTypesDef typesDef, AtlasTypeRegistry typeRegistry) {
AtlasTypesDef typesToUpdate = new AtlasTypesDef();
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
for (AtlasStructDef newStructDef : typesDef.getStructDefs()) {
AtlasStructDef oldStructDef = typeRegistry.getStructDefByName(newStructDef.getName());
if (oldStructDef == null) {
continue;
}
if (updateTypeAttributes(oldStructDef, newStructDef)) {
typesToUpdate.getStructDefs().add(newStructDef);
}
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
for (AtlasClassificationDef newClassifDef : typesDef.getClassificationDefs()) {
AtlasClassificationDef oldClassifDef = typeRegistry.getClassificationDefByName(newClassifDef.getName());
if (oldClassifDef == null) {
continue;
}
if (updateTypeAttributes(oldClassifDef, newClassifDef)) {
typesToUpdate.getClassificationDefs().add(newClassifDef);
}
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef newEntityDef : typesDef.getEntityDefs()) {
AtlasEntityDef oldEntityDef = typeRegistry.getEntityDefByName(newEntityDef.getName());
if (oldEntityDef == null) {
continue;
}
if (updateTypeAttributes(oldEntityDef, newEntityDef)) {
typesToUpdate.getEntityDefs().add(newEntityDef);
}
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
for (AtlasEnumDef newEnumDef : typesDef.getEnumDefs()) {
AtlasEnumDef oldEnumDef = typeRegistry.getEnumDefByName(newEnumDef.getName());
if (oldEnumDef == null) {
continue;
}
if (isTypeUpdateApplicable(oldEnumDef, newEnumDef)) {
if (CollectionUtils.isNotEmpty(oldEnumDef.getElementDefs())) {
for (AtlasEnumElementDef oldEnumElem : oldEnumDef.getElementDefs()) {
if (!newEnumDef.hasElement(oldEnumElem.getValue())) {
newEnumDef.addElement(oldEnumElem);
}
}
}
typesToUpdate.getEnumDefs().add(newEnumDef);
}
}
}
return typesToUpdate;
}
private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef) {
boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef);
if (ret) {
// make sure that all attributes in oldDef are in newDef as well
if (CollectionUtils.isNotEmpty(oldStructDef.getAttributeDefs())){
for (AtlasAttributeDef oldAttrDef : oldStructDef.getAttributeDefs()) {
if (!newStructDef.hasAttribute(oldAttrDef.getName())) {
newStructDef.addAttribute(oldAttrDef);
}
}
}
}
return ret;
}
private static boolean isTypeUpdateApplicable(AtlasBaseTypeDef oldTypeDef, AtlasBaseTypeDef newTypeDef) {
String oldTypeVersion = oldTypeDef.getTypeVersion();
String newTypeVersion = newTypeDef.getTypeVersion();
return ObjectUtils.compare(newTypeVersion, oldTypeVersion) > 0;
}
private void applyTypePatches(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, String typesDirName) {
String typePatchesDirName = typesDirName + File.separator + "patches";
File typePatchesDir = new File(typePatchesDirName);
......
......@@ -297,91 +297,70 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
CollectionUtils.size(typesDef.getEntityDefs()));
}
AtlasTypesDef ret = new AtlasTypesDef();
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.addTypes(typesDef);
AtlasEnumDefStore enumDefStore = getEnumDefStore(ttr);
AtlasStructDefStore structDefStore = getStructDefStore(ttr);
AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
AtlasEntityDefStore entityDefStore = getEntityDefStore(ttr);
List<Object> preCreateStructDefs = new ArrayList<>();
List<Object> preCreateClassifiDefs = new ArrayList<>();
List<Object> preCreateEntityDefs = new ArrayList<>();
if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
AtlasEnumDef createdDef = enumDefStore.create(enumDef);
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
AtlasTypesDef ret = addToGraphStore(typesDef, ttr);
ret.getEnumDefs().add(createdDef);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
CollectionUtils.size(typesDef.getStructDefs()),
CollectionUtils.size(typesDef.getClassificationDefs()),
CollectionUtils.size(typesDef.getEntityDefs()));
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
preCreateStructDefs.add(structDefStore.preCreate(structDef));
}
return ret;
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
preCreateClassifiDefs.add(classifiDefStore.preCreate(classifiDef));
}
@Override
@GraphTransaction
public AtlasTypesDef createUpdateTypesDef(AtlasTypesDef typesToCreate, AtlasTypesDef typesToUpdate) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AtlasTypeDefGraphStore.createUpdateTypesDef({}, {})", typesToCreate, typesToUpdate);
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
preCreateEntityDefs.add(entityDefStore.preCreate(entityDef));
}
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
if (!typesToUpdate.isEmpty()) {
ttr.updateTypesWithNoRefResolve(typesToUpdate);
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
int i = 0;
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
AtlasStructDef createdDef = structDefStore.create(structDef, preCreateStructDefs.get(i));
ttr.addTypes(typesToCreate);
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
AtlasTypesDef ret = addToGraphStore(typesToCreate, ttr);
ret.getStructDefs().add(createdDef);
i++;
if (!typesToUpdate.isEmpty()) {
AtlasTypesDef updatedTypes = updateGraphStore(typesToUpdate, ttr);
if (CollectionUtils.isNotEmpty(updatedTypes.getEnumDefs())) {
for (AtlasEnumDef enumDef : updatedTypes.getEnumDefs()) {
ret.getEnumDefs().add(enumDef);
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
int i = 0;
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
AtlasClassificationDef createdDef = classifiDefStore.create(classifiDef, preCreateClassifiDefs.get(i));
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
ret.getClassificationDefs().add(createdDef);
i++;
if (CollectionUtils.isNotEmpty(updatedTypes.getStructDefs())) {
for (AtlasStructDef structDef : updatedTypes.getStructDefs()) {
ret.getStructDefs().add(structDef);
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
int i = 0;
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
AtlasEntityDef createdDef = entityDefStore.create(entityDef, preCreateEntityDefs.get(i));
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
if (CollectionUtils.isNotEmpty(updatedTypes.getClassificationDefs())) {
for (AtlasClassificationDef classificationDef : updatedTypes.getClassificationDefs()) {
ret.getClassificationDefs().add(classificationDef);
}
}
ret.getEntityDefs().add(createdDef);
i++;
if (CollectionUtils.isNotEmpty(updatedTypes.getEntityDefs())) {
for (AtlasEntityDef entityDef : updatedTypes.getEntityDefs()) {
ret.getEntityDefs().add(entityDef);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})",
CollectionUtils.size(typesDef.getEnumDefs()),
CollectionUtils.size(typesDef.getStructDefs()),
CollectionUtils.size(typesDef.getClassificationDefs()),
CollectionUtils.size(typesDef.getEntityDefs()));
LOG.debug("<== AtlasTypeDefGraphStore.createUpdateTypesDef({}, {}): {}", typesToCreate, typesToUpdate, ret);
}
return ret;
......@@ -398,40 +377,11 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
CollectionUtils.size(typesDef.getEntityDefs()));
}
AtlasTypesDef ret = new AtlasTypesDef();
AtlasTransientTypeRegistry ttr = lockTypeRegistryAndReleasePostCommit();
ttr.updateTypes(typesDef);
AtlasEnumDefStore enumDefStore = getEnumDefStore(ttr);
AtlasStructDefStore structDefStore = getStructDefStore(ttr);
AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
AtlasEntityDefStore entityDefStore = getEntityDefStore(ttr);
if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
ret.getEnumDefs().add(enumDefStore.update(enumDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
ret.getStructDefs().add(structDefStore.update(structDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
ret.getClassificationDefs().add(classifiDefStore.update(classifiDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
ret.getEntityDefs().add(entityDefStore.update(entityDef));
}
}
AtlasTypesDef ret = updateGraphStore(typesDef, ttr);
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})",
......@@ -745,6 +695,120 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
}
}
private AtlasTypesDef addToGraphStore(AtlasTypesDef typesDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
AtlasTypesDef ret = new AtlasTypesDef();
AtlasEnumDefStore enumDefStore = getEnumDefStore(ttr);
AtlasStructDefStore structDefStore = getStructDefStore(ttr);
AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
AtlasEntityDefStore entityDefStore = getEntityDefStore(ttr);
List<Object> preCreateStructDefs = new ArrayList<>();
List<Object> preCreateClassifiDefs = new ArrayList<>();
List<Object> preCreateEntityDefs = new ArrayList<>();
if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
AtlasEnumDef createdDef = enumDefStore.create(enumDef);
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
ret.getEnumDefs().add(createdDef);
}
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
preCreateStructDefs.add(structDefStore.preCreate(structDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
preCreateClassifiDefs.add(classifiDefStore.preCreate(classifiDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
preCreateEntityDefs.add(entityDefStore.preCreate(entityDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
int i = 0;
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
AtlasStructDef createdDef = structDefStore.create(structDef, preCreateStructDefs.get(i));
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
ret.getStructDefs().add(createdDef);
i++;
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
int i = 0;
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
AtlasClassificationDef createdDef = classifiDefStore.create(classifiDef, preCreateClassifiDefs.get(i));
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
ret.getClassificationDefs().add(createdDef);
i++;
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
int i = 0;
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
AtlasEntityDef createdDef = entityDefStore.create(entityDef, preCreateEntityDefs.get(i));
ttr.updateGuid(createdDef.getName(), createdDef.getGuid());
ret.getEntityDefs().add(createdDef);
i++;
}
}
return ret;
}
private AtlasTypesDef updateGraphStore(AtlasTypesDef typesDef, AtlasTransientTypeRegistry ttr) throws AtlasBaseException {
AtlasTypesDef ret = new AtlasTypesDef();
AtlasEnumDefStore enumDefStore = getEnumDefStore(ttr);
AtlasStructDefStore structDefStore = getStructDefStore(ttr);
AtlasClassificationDefStore classifiDefStore = getClassificationDefStore(ttr);
AtlasEntityDefStore entityDefStore = getEntityDefStore(ttr);
if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) {
for (AtlasEnumDef enumDef : typesDef.getEnumDefs()) {
ret.getEnumDefs().add(enumDefStore.update(enumDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) {
for (AtlasStructDef structDef : typesDef.getStructDefs()) {
ret.getStructDefs().add(structDefStore.update(structDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) {
for (AtlasClassificationDef classifiDef : typesDef.getClassificationDefs()) {
ret.getClassificationDefs().add(classifiDefStore.update(classifiDef));
}
}
if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) {
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
ret.getEntityDefs().add(entityDefStore.update(entityDef));
}
}
return ret;
}
private class TypeRegistryUpdateHook extends GraphTransactionInterceptor.PostTransactionHook {
private final AtlasTransientTypeRegistry ttr;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment