Commit 10bcaa80 by Madhan Neethiraj

ATLAS-3621: updated HiveHook to not save query-string in multiple attributes - queryText and name

parent 5e7f8949
......@@ -646,7 +646,6 @@ public abstract class BaseHiveEvent {
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
}
ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
String qualifiedName = getQualifiedName(inputs, outputs);
......@@ -661,6 +660,7 @@ public abstract class BaseHiveEvent {
}
}
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, qualifiedName);
ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
......@@ -697,7 +697,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString() +
QNAME_SEP_PROCESS + getQueryStartTime().toString() +
QNAME_SEP_PROCESS + endTime.toString());
ret.setAttribute(ATTRIBUTE_NAME, queryStr + QNAME_SEP_PROCESS + getQueryStartTime().toString());
ret.setAttribute(ATTRIBUTE_NAME, ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
ret.setAttribute(ATTRIBUTE_START_TIME, getQueryStartTime());
ret.setAttribute(ATTRIBUTE_END_TIME, endTime);
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
......
......@@ -146,6 +146,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
public static final String CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME = "atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name";
public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user";
......@@ -165,6 +166,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final int commitBatchSize;
private final boolean skipHiveColumnLineageHive20633;
private final int skipHiveColumnLineageHive20633InputsThreshold;
private final boolean updateHiveProcessNameWithQualifiedName;
private final int largeMessageProcessingTimeThresholdMs;
private final boolean consumerDisabled;
private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
......@@ -212,6 +214,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
updateHiveProcessNameWithQualifiedName = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME, true);
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
......@@ -294,9 +297,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveTablePrefixesToIgnore = Collections.emptyList();
}
LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME, updateHiveProcessNameWithQualifiedName);
hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
preprocessEnabled = skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
......@@ -584,6 +589,33 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return;
}
// covert V1 messages to V2 to enable preProcess
try {
switch (message.getType()) {
case ENTITY_CREATE: {
final EntityCreateRequest createRequest = (EntityCreateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
final EntityCreateRequestV2 v2Request = new EntityCreateRequestV2(message.getUser(), entities);
kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
message = kafkaMsg.getMessage();
}
break;
case ENTITY_FULL_UPDATE: {
final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
final EntityUpdateRequestV2 v2Request = new EntityUpdateRequestV2(messageUser, entities);
kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
message = kafkaMsg.getMessage();
}
break;
}
} catch (AtlasBaseException excp) {
LOG.error("handleMessage(): failed to convert V1 message to V2", message.getType().name());
}
PreprocessorContext context = preProcessNotificationMessage(kafkaMsg);
if (isEmptyMessage(kafkaMsg)) {
......@@ -934,7 +966,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
PreprocessorContext context = null;
if (preprocessEnabled) {
context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName);
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
......@@ -950,7 +982,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
context.moveRegisteredReferredEntities();
if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) {
if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities()) && context.getEntities().size() > 1) {
// move hive_process and hive_column_lineage entities to end of the list
List<AtlasEntity> entities = context.getEntities();
int count = entities.size();
......
......@@ -162,6 +162,14 @@ public class HivePreprocessor {
@Override
public void preprocess(AtlasEntity entity, PreprocessorContext context) {
if (context.updateHiveProcessNameWithQualifiedName()) {
if (LOG.isDebugEnabled()) {
LOG.debug("setting {}.name={}. topic-offset={}, partition={}", entity.getTypeName(), entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), context.getKafkaMessageOffset(), context.getKafkaPartition());
}
entity.setAttribute(ATTRIBUTE_NAME, entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
if (context.isIgnoredEntity(entity.getGuid())) {
context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName
} else {
......
......@@ -59,6 +59,7 @@ public class PreprocessorContext {
private final List<String> hiveDummyDatabasesToIgnore;
private final List<String> hiveDummyTablesToIgnore;
private final List<String> hiveTablePrefixesToIgnore;
private final boolean updateHiveProcessNameWithQualifiedName;
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean isHivePreProcessEnabled;
......@@ -70,17 +71,18 @@ public class PreprocessorContext {
private final Map<String, String> guidAssignments = new HashMap<>();
private List<AtlasEntity> postUpdateEntities = null;
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
this.kafkaMessage = kafkaMessage;
this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
this.hiveDummyDatabasesToIgnore = hiveDummyDatabasesToIgnore;
this.hiveDummyTablesToIgnore = hiveDummyTablesToIgnore;
this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore;
this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs;
this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName) {
this.kafkaMessage = kafkaMessage;
this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
this.hiveDummyDatabasesToIgnore = hiveDummyDatabasesToIgnore;
this.hiveDummyTablesToIgnore = hiveDummyTablesToIgnore;
this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore;
this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs;
this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
this.updateHiveProcessNameWithQualifiedName = updateHiveProcessNameWithQualifiedName;
final HookNotification message = kafkaMessage.getMessage();
......@@ -98,7 +100,7 @@ public class PreprocessorContext {
break;
}
this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty() || updateHiveProcessNameWithQualifiedName;
}
public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
......@@ -113,6 +115,8 @@ public class PreprocessorContext {
return kafkaMessage.getPartition();
}
public boolean updateHiveProcessNameWithQualifiedName() { return updateHiveProcessNameWithQualifiedName; }
public boolean getHiveTypesRemoveOwnedRefAttrs() { return hiveTypesRemoveOwnedRefAttrs; }
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.*;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.util.AtlasMetricsUtil;
......@@ -89,7 +90,7 @@ public class NotificationHookConsumerKafkaTest {
MockitoAnnotations.initMocks(this);
AtlasType mockType = mock(AtlasType.class);
AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class);
AtlasEntitiesWithExtInfo mockEntity = new AtlasEntitiesWithExtInfo(createV2Entity());
when(typeRegistry.getType(anyString())).thenReturn(mockType);
......@@ -220,6 +221,16 @@ public class NotificationHookConsumerKafkaTest {
return entity;
}
AtlasEntity createV2Entity() {
final AtlasEntity entity = new AtlasEntity(AtlasClient.DATA_SET_SUPER_TYPE);
entity.setAttribute(NAME, "db" + randomString());
entity.setAttribute(DESCRIPTION, randomString());
entity.setAttribute(QUALIFIED_NAME, randomString());
return entity;
}
protected String randomString() {
return RandomStringUtils.randomAlphanumeric(10);
}
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
......@@ -85,7 +86,7 @@ public class NotificationHookConsumerTest {
MockitoAnnotations.initMocks(this);
AtlasType mockType = mock(AtlasType.class);
AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class);
AtlasEntitiesWithExtInfo mockEntity = new AtlasEntitiesWithExtInfo(mock(AtlasEntity.class));
when(typeRegistry.getType(anyString())).thenReturn(mockType);
when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment