Commit 24715256 by Madhan Neethiraj

ATLAS-3067: updated hive types to remove use of ownedRef/inverseRef attributes

parent 5d99c40a
{
"patches": [
{
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "hive_table_db",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__hive_table.db"
}
},
{
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "hive_table_columns",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__hive_table.columns"
}
},
{
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "hive_table_partitionkeys",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__hive_table.partitionkeys"
}
},
{
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "hive_table_storagedesc",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__hive_table.sd"
}
},
{
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "hive_process_column_lineage",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__hive_column_lineage.query",
"swapEnds": "true"
}
}
]
}
{
"patches": [
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_instance_databases",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_instance.databases",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_db_tables",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_db.tables",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_table_columns",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_table.columns",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_table_indexes",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_table.indexes",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_table_foreign_key",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_table.foreign_keys",
"relationshipCategory": "COMPOSITION"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_index_columns",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_index.columns"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_foreign_key_key_columns",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_foreign_key.key_columns"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_foreign_key_table_references",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_foreign_key.references_table"
}
},
{
"action": "REMOVE_LEGACY_ATTRIBUTES",
"action": "REMOVE_LEGACY_REF_ATTRIBUTES",
"typeName": "rdbms_foreign_key_column_references",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"applyToVersion": "1.1",
"updateToVersion": "1.2",
"params": {
"relationshipLabel": "__rdbms_foreign_key.references_columns"
}
......
......@@ -78,9 +78,10 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
@Service
public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class);
public static final String PATCHES_FOLDER_NAME = "patches";
public static final String RELATIONSHIP_LABEL = "relationshipLabel";
public static final String RELATIONSHIP_CATEGORY = "relationshipCategory";
public static final String PATCHES_FOLDER_NAME = "patches";
public static final String RELATIONSHIP_LABEL = "relationshipLabel";
public static final String RELATIONSHIP_CATEGORY = "relationshipCategory";
public static final String RELATIONSHIP_SWAP_ENDS = "swapEnds";
private final AtlasTypeDefStore atlasTypeDefStore;
private final AtlasTypeRegistry atlasTypeRegistry;
......@@ -414,7 +415,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
PatchHandler[] patchHandlers = new PatchHandler[] {
new AddAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new UpdateAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new RemoveLegacyAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new RemoveLegacyRefAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new UpdateTypeDefOptionsPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new SetServiceTypePatchHandler(atlasTypeDefStore, atlasTypeRegistry)
};
......@@ -710,9 +711,9 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
class RemoveLegacyAttributesPatchHandler extends PatchHandler {
public RemoveLegacyAttributesPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
super(typeDefStore, typeRegistry, new String[] { "REMOVE_LEGACY_ATTRIBUTES" });
class RemoveLegacyRefAttributesPatchHandler extends PatchHandler {
public RemoveLegacyRefAttributesPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
super(typeDefStore, typeRegistry, new String[] { "REMOVE_LEGACY_REF_ATTRIBUTES" });
}
@Override
......@@ -734,10 +735,12 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
String newRelationshipLabel = null;
RelationshipCategory newRelationshipCategory = null;
boolean swapEnds = false;
if (patch.getParams() != null) {
Object relLabel = patch.getParams().get(RELATIONSHIP_LABEL);
Object relCategory = patch.getParams().get(RELATIONSHIP_CATEGORY);
Object relSwapEnds = patch.getParams().get(RELATIONSHIP_SWAP_ENDS);
if (relLabel != null) {
newRelationshipLabel = relLabel.toString();
......@@ -746,6 +749,10 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
if (relCategory != null) {
newRelationshipCategory = RelationshipCategory.valueOf(relCategory.toString());
}
if (relSwapEnds != null) {
swapEnds = Boolean.valueOf(relSwapEnds.toString());
}
}
if (StringUtils.isEmpty(newRelationshipLabel)) {
......@@ -766,9 +773,19 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
AtlasRelationshipDef updatedDef = new AtlasRelationshipDef(relationshipDef);
AtlasEntityDef updatedEntityDef1 = new AtlasEntityDef(end1Type.getEntityDef());
AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef());
AtlasRelationshipDef updatedDef = new AtlasRelationshipDef(relationshipDef);
if (swapEnds) {
AtlasRelationshipEndDef tmp = updatedDef.getEndDef1();
updatedDef.setEndDef1(updatedDef.getEndDef2());
updatedDef.setEndDef2(tmp);
}
end1Def = updatedDef.getEndDef1();
end2Def = updatedDef.getEndDef2();
end1Type = typeRegistry.getEntityTypeByName(end1Def.getType());
end2Type = typeRegistry.getEntityTypeByName(end2Def.getType());
updatedDef.setRelationshipLabel(newRelationshipLabel);
......@@ -776,10 +793,13 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
updatedDef.setRelationshipCategory(newRelationshipCategory);
}
updatedDef.getEndDef1().setIsLegacyAttribute(false);
updatedDef.getEndDef2().setIsLegacyAttribute(false);
end1Def.setIsLegacyAttribute(false);
end2Def.setIsLegacyAttribute(false);
updatedDef.setTypeVersion(patch.getUpdateToVersion());
AtlasEntityDef updatedEntityDef1 = new AtlasEntityDef(end1Type.getEntityDef());
AtlasEntityDef updatedEntityDef2 = new AtlasEntityDef(end2Type.getEntityDef());
updatedEntityDef1.removeAttribute(end1Def.getName());
updatedEntityDef2.removeAttribute(end2Def.getName());
......
......@@ -436,19 +436,25 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe
}
AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1();
AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2();
AtlasRelationshipEndDef newEnd1 = newRelationshipDef.getEndDef1();
AtlasRelationshipEndDef newEnd2 = newRelationshipDef.getEndDef2();
boolean endsSwaped = false;
if ( !isValidUpdate(existingEnd1, newEnd1) ) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString());
if (RequestContext.get().isInTypePatching() && isValidUpdate(existingEnd1, newEnd2)) { // allow swap of ends during type-patch
endsSwaped = true;
} else {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString());
}
}
AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2();
AtlasRelationshipEndDef newEnd2 = newRelationshipDef.getEndDef2();
AtlasRelationshipEndDef newEndToCompareWith = endsSwaped ? newEnd1 : newEnd2;
if ( !isValidUpdate(existingEnd2, newEnd2) ) {
if ( !isValidUpdate(existingEnd2, newEndToCompareWith) ) {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END2_UPDATE,
newRelationshipDef.getName(), newEnd2.toString(), existingEnd2.toString());
newRelationshipDef.getName(), newEndToCompareWith.toString(), existingEnd2.toString());
}
}
......@@ -520,7 +526,7 @@ public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRe
}
private static boolean isValidUpdate(AtlasRelationshipEndDef currentDef, AtlasRelationshipEndDef updatedDef) {
// permit updates to description and isLegacyAttribute (ref type-patch REMOVE_LEGACY_ATTRIBUTES)
// permit updates to description and isLegacyAttribute (ref type-patch REMOVE_LEGACY_REF_ATTRIBUTES)
return Objects.equals(currentDef.getType(), updatedDef.getType()) &&
Objects.equals(currentDef.getName(), updatedDef.getName()) &&
Objects.equals(currentDef.getIsContainer(), updatedDef.getIsContainer()) &&
......
......@@ -78,7 +78,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
......@@ -87,6 +86,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
/**
* Consumer of notifications from hooks e.g., hive hook etc.
*/
......@@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
......@@ -143,6 +144,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new ArrayList<>();
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled;
......@@ -172,7 +174,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default
maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 0);
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
......@@ -214,11 +216,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveTablesCache = Collections.emptyMap();
}
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || rdbmsTypesRemoveOwnedRefAttrs;
hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, false);
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, false);
preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs;
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs);
LOG.info("{}={}", CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, rdbmsTypesRemoveOwnedRefAttrs);
LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize);
LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled);
}
@Override
......@@ -694,7 +701,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
List<AtlasEntity> entitiesList = entities.getEntities();
AtlasEntityStream entityStream = new AtlasEntityStream(entities);
if (entitiesList.size() <= commitBatchSize) {
if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
} else {
for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
......@@ -792,10 +799,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return;
}
PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, rdbmsTypesRemoveOwnedRefAttrs);
PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
ignoreOrPruneHiveTables(context);
if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs) {
preprocessHiveTypes(context);
}
if (skipHiveColumnLineageHive20633) {
......@@ -813,8 +820,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
EntityPreprocessor preprocessor = EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName());
if (preprocessor != null) {
......@@ -824,19 +831,19 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
private void ignoreOrPruneHiveTables(PreprocessorContext context) {
private void preprocessHiveTypes(PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
EntityPreprocessor preprocessor = EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
if (preprocessor != null) {
preprocessor.preprocess(entity, context);
if (context.isIgnoredEntity(entity.getGuid())) {
iter.remove();
entities.remove(i--);
}
}
}
......@@ -877,8 +884,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
Set<String> lineageQNames = new HashSet<>();
// find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases
for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
final Object qName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
......@@ -887,7 +894,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
final String qualifiedName = qName.toString();
if (lineageQNames.contains(qualifiedName)) {
iter.remove();
entities.remove(i--);
LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition());
......@@ -914,11 +921,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0;
if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) {
for (ListIterator<AtlasEntity> iter = entities.listIterator(); iter.hasNext(); ) {
AtlasEntity entity = iter.next();
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
if (StringUtils.equals(entity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
iter.remove();
entities.remove(i--);
numRemovedEntities++;
}
......
......@@ -48,6 +48,8 @@ public abstract class EntityPreprocessor {
public static final String ATTRIBUTE_SD = "sd";
public static final String ATTRIBUTE_DB = "db";
public static final String ATTRIBUTE_DATABASES = "databases";
public static final String ATTRIBUTE_QUERY = "query";
public static final String ATTRIBUTE_TABLE = "table";
public static final String ATTRIBUTE_TABLES = "tables";
public static final String ATTRIBUTE_INDEXES = "indexes";
public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys";
......
......@@ -18,14 +18,26 @@
package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class HivePreprocessor {
private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class);
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns";
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionKeys";
static class HiveTablePreprocessor extends EntityPreprocessor {
public HiveTablePreprocessor() {
super(TYPE_HIVE_TABLE);
......@@ -54,9 +66,64 @@ public class HivePreprocessor {
entity.setAttribute(ATTRIBUTE_SD, null);
entity.setAttribute(ATTRIBUTE_COLUMNS, null);
entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
} else if (context.getHiveTypesRemoveOwnedRefAttrs()) {
context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD);
removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, context);
removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, context);
}
}
}
private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) {
Object attrVal = entity.getAttribute(attrName);
if (attrVal != null) {
Set<String> guids = new HashSet<>();
context.collectGuids(attrVal, guids);
for (String guid : guids) {
AtlasEntity colEntity = context.getEntity(guid);
if (colEntity != null) {
Object attrTable = null;
if (colEntity.hasRelationshipAttribute(ATTRIBUTE_TABLE)) {
attrTable = colEntity.getRelationshipAttribute(ATTRIBUTE_TABLE);
} else if (colEntity.hasAttribute(ATTRIBUTE_TABLE)) {
attrTable = colEntity.getAttribute(ATTRIBUTE_TABLE);
}
attrTable = setRelationshipType(attrTable, relationshipType);
if (attrTable != null) {
colEntity.setRelationshipAttribute(ATTRIBUTE_TABLE, attrTable);
}
context.addToReferredEntitiesToMove(guid);
}
}
}
}
private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
AtlasRelatedObjectId ret = null;
if (attr instanceof AtlasRelatedObjectId) {
ret = (AtlasRelatedObjectId) attr;
} else if (attr instanceof AtlasObjectId) {
ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
} else if (attr instanceof Map) {
ret = new AtlasRelatedObjectId((Map) attr);
}
if (ret != null) {
ret.setRelationshipType(relationshipType);
}
return ret;
}
}
......
......@@ -44,16 +44,18 @@ public class PreprocessorContext {
private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
private final Map<String, PreprocessAction> hiveTablesCache;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>();
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean rdbmsTypesRemoveOwnedRefAttrs) {
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
this.kafkaMessage = kafkaMessage;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs;
this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
final HookNotification message = kafkaMessage.getMessage();
......@@ -85,6 +87,8 @@ public class PreprocessorContext {
return kafkaMessage.getPartition();
}
public boolean getHiveTypesRemoveOwnedRefAttrs() { return hiveTypesRemoveOwnedRefAttrs; }
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
public List<AtlasEntity> getEntities() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment