Commit 5d99c40a by Madhan Neethiraj

ATLAS-3056: updated rdbms types to remove use of ownedRef/inverseRef - #2

parent 5651f934
{
"patches": [
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"typeName": "rdbms_instance_databases",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": {
"relationshipLabel": "__rdbms_instance.databases",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"typeName": "rdbms_db_tables",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": {
"relationshipLabel": "__rdbms_db.tables",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"typeName": "rdbms_table_columns",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": {
"relationshipLabel": "__rdbms_table.columns",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"typeName": "rdbms_table_indexes",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": {
"relationshipLabel": "__rdbms_table.indexes",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"typeName": "rdbms_table_foreign_key",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": {
"relationshipLabel": "__rdbms_table.foreign_keys",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"typeName": "rdbms_index_columns",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": {
"relationshipLabel": "__rdbms_index.columns"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"typeName": "rdbms_foreign_key_key_columns",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": {
"relationshipLabel": "__rdbms_foreign_key.key_columns"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"typeName": "rdbms_foreign_key_table_references",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": {
"relationshipLabel": "__rdbms_foreign_key.references_table"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"typeName": "rdbms_foreign_key_column_references",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": {
"relationshipLabel": "__rdbms_foreign_key.references_columns"
}
}
]
}
......@@ -147,12 +147,10 @@ public class AtlasStruct implements Serializable {
}
}
public void removeAttribute(String name) {
public Object removeAttribute(String name) {
Map<String, Object> a = this.attributes;
if (a != null && a.containsKey(name)) {
a.remove(name);
}
return a != null ? a.remove(name) : null;
}
public StringBuilder toString(StringBuilder sb) {
......
......@@ -35,6 +35,7 @@ import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
......@@ -77,8 +78,9 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
@Service
public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class);
public static final String PATCHES_FOLDER_NAME = "patches";
public static final String RELATIONSHIP_LABEL = "relationshipLabel";
public static final String PATCHES_FOLDER_NAME = "patches";
public static final String RELATIONSHIP_LABEL = "relationshipLabel";
public static final String RELATIONSHIP_CATEGORY = "relationshipCategory";
private final AtlasTypeDefStore atlasTypeDefStore;
private final AtlasTypeRegistry atlasTypeRegistry;
......@@ -730,13 +732,19 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
AtlasEntityType end1Type = typeRegistry.getEntityTypeByName(end1Def.getType());
AtlasEntityType end2Type = typeRegistry.getEntityTypeByName(end2Def.getType());
String newRelationshipLabel = null;
String newRelationshipLabel = null;
RelationshipCategory newRelationshipCategory = null;
if (patch.getParams() != null) {
Object val = patch.getParams().get(RELATIONSHIP_LABEL);
Object relLabel = patch.getParams().get(RELATIONSHIP_LABEL);
Object relCategory = patch.getParams().get(RELATIONSHIP_CATEGORY);
if (val != null) {
newRelationshipLabel = val.toString();
if (relLabel != null) {
newRelationshipLabel = relLabel.toString();
}
if (relCategory != null) {
newRelationshipCategory = RelationshipCategory.valueOf(relCategory.toString());
}
}
......@@ -763,6 +771,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef());
updatedDef.setRelationshipLabel(newRelationshipLabel);
if (newRelationshipCategory != null) {
updatedDef.setRelationshipCategory(newRelationshipCategory);
}
updatedDef.getEndDef1().setIsLegacyAttribute(false);
updatedDef.getEndDef2().setIsLegacyAttribute(false);
updatedDef.setTypeVersion(patch.getUpdateToVersion());
......
......@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
......@@ -425,9 +426,13 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe
RelationshipCategory newRelationshipCategory = newRelationshipDef.getRelationshipCategory();
if ( !existingRelationshipCategory.equals(newRelationshipCategory)){
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE,
newRelationshipDef.getName(),newRelationshipCategory.name(),
existingRelationshipCategory.name() );
if (!RequestContext.get().isInTypePatching()) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE,
newRelationshipDef.getName(), newRelationshipCategory.name(),
existingRelationshipCategory.name());
} else {
LOG.warn("RELATIONSHIP UPDATE: relationship category from {} to {} for {}", existingRelationshipCategory.name(), newRelationshipCategory.name(), newRelationshipDef.getName());
}
}
AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1();
......
......@@ -792,7 +792,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return;
}
PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache);
PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, rdbmsTypesRemoveOwnedRefAttrs);
if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
ignoreOrPruneHiveTables(context);
......
......@@ -44,15 +44,17 @@ public class PreprocessorContext {
private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>();
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) {
this.kafkaMessage = kafkaMessage;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean rdbmsTypesRemoveOwnedRefAttrs) {
this.kafkaMessage = kafkaMessage;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
final HookNotification message = kafkaMessage.getMessage();
......@@ -83,6 +85,8 @@ public class PreprocessorContext {
return kafkaMessage.getPartition();
}
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
public List<AtlasEntity> getEntities() {
return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null;
}
......@@ -95,6 +99,12 @@ public class PreprocessorContext {
return entitiesWithExtInfo != null && guid != null ? entitiesWithExtInfo.getEntity(guid) : null;
}
public AtlasEntity removeReferredEntity(String guid) {
Map<String, AtlasEntity> referredEntities = getReferredEntities();
return referredEntities != null && guid != null ? referredEntities.remove(guid) : null;
}
public Set<String> getIgnoredEntities() { return ignoredEntities; }
public Set<String> getPrunedEntities() { return prunedEntities; }
......@@ -165,6 +175,14 @@ public class PreprocessorContext {
}
}
public void addToReferredEntitiesToMove(Collection<String> guids) {
if (guids != null) {
for (String guid : guids) {
addToReferredEntitiesToMove(guid);
}
}
}
public void addToIgnoredEntities(Object obj) {
collectGuids(obj, ignoredEntities);
}
......@@ -173,6 +191,14 @@ public class PreprocessorContext {
collectGuids(obj, prunedEntities);
}
public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName) {
Set<String> guidsToMove = new HashSet<>();
collectGuids(entity.removeAttribute(attrName), guidsToMove);
addToReferredEntitiesToMove(guidsToMove);
}
public void moveRegisteredReferredEntities() {
List<AtlasEntity> entities = getEntities();
Map<String, AtlasEntity> referredEntities = getReferredEntities();
......@@ -202,38 +228,39 @@ public class PreprocessorContext {
}
}
public String getGuid(Object obj) {
public String getTypeName(Object obj) {
Object ret = null;
if (obj instanceof AtlasObjectId) {
ret = ((AtlasObjectId) obj).getGuid();
ret = ((AtlasObjectId) obj).getTypeName();
} else if (obj instanceof Map) {
ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
} else if (obj instanceof AtlasEntity) {
ret = ((AtlasEntity) obj).getGuid();
ret = ((AtlasEntity) obj).getTypeName();
} else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid();
ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getTypeName();
}
return ret != null ? ret.toString() : null;
}
public String getGuid(Object obj) {
Object ret = null;
private boolean isMatch(String name, List<Pattern> patterns) {
boolean ret = false;
for (Pattern p : patterns) {
if (p.matcher(name).matches()) {
ret = true;
break;
}
if (obj instanceof AtlasObjectId) {
ret = ((AtlasObjectId) obj).getGuid();
} else if (obj instanceof Map) {
ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
} else if (obj instanceof AtlasEntity) {
ret = ((AtlasEntity) obj).getGuid();
} else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
ret = ((AtlasEntity.AtlasEntityWithExtInfo) obj).getEntity().getGuid();
}
return ret;
return ret != null ? ret.toString() : null;
}
private void collectGuids(Object obj, Set<String> guids) {
public void collectGuids(Object obj, Set<String> guids) {
if (obj != null) {
if (obj instanceof Collection) {
Collection objList = (Collection) obj;
......@@ -247,11 +274,26 @@ public class PreprocessorContext {
}
}
private void collectGuid(Object obj, Set<String> guids) {
public void collectGuid(Object obj, Set<String> guids) {
String guid = getGuid(obj);
if (guid != null) {
guids.add(guid);
}
}
private boolean isMatch(String name, List<Pattern> patterns) {
boolean ret = false;
for (Pattern p : patterns) {
if (p.matcher(name).matches()) {
ret = true;
break;
}
}
return ret;
}
}
......@@ -103,35 +103,35 @@ public class RdbmsPreprocessor {
@Override
public void preprocess(AtlasEntity entity, PreprocessorContext context) {
clearRefAttributes(entity, context);
if (context.getRdbmsTypesRemoveOwnedRefAttrs()) {
clearRefAttributesAndMove(entity, context);
Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
if (MapUtils.isNotEmpty(referredEntities)) {
for (AtlasEntity referredEntity : referredEntities.values()) {
if (entityTypesToMove.contains(referredEntity.getTypeName())) {
clearRefAttributes(referredEntity, context);
context.addToReferredEntitiesToMove(referredEntity.getGuid());
if (MapUtils.isNotEmpty(referredEntities)) {
for (AtlasEntity referredEntity : referredEntities.values()) {
if (entityTypesToMove.contains(referredEntity.getTypeName())) {
clearRefAttributesAndMove(referredEntity, context);
}
}
}
}
}
private void clearRefAttributes(AtlasEntity entity, PreprocessorContext context) {
private void clearRefAttributesAndMove(AtlasEntity entity, PreprocessorContext context) {
switch (entity.getTypeName()) {
case TYPE_RDBMS_INSTANCE:
entity.removeAttribute(ATTRIBUTE_DATABASES);
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES);
break;
case TYPE_RDBMS_DB:
entity.removeAttribute(ATTRIBUTE_TABLES);
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES);
break;
case TYPE_RDBMS_TABLE:
entity.removeAttribute(ATTRIBUTE_COLUMNS);
entity.removeAttribute(ATTRIBUTE_INDEXES);
entity.removeAttribute(ATTRIBUTE_FOREIGN_KEYS);
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS);
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES);
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS);
break;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment