Commit 3841a1bf by Madhan Neethiraj

ATLAS-3054: updated notification processing to support batch-commits

parent 74b32a17
......@@ -30,27 +30,27 @@ public class AtlasEntityStream implements EntityStream {
public AtlasEntityStream(AtlasEntity entity) {
this(new AtlasEntitiesWithExtInfo(entity));
this(new AtlasEntitiesWithExtInfo(entity), null);
}
public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) {
this(new AtlasEntitiesWithExtInfo(entityWithExtInfo));
this(new AtlasEntitiesWithExtInfo(entityWithExtInfo), null);
}
public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
this.entitiesWithExtInfo = entitiesWithExtInfo;
this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
this.entityStream = null;
this(entitiesWithExtInfo, null);
}
public AtlasEntityStream(AtlasEntity entity, EntityStream entityStream) {
this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entity);
this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
this.entityStream = entityStream;
this(new AtlasEntitiesWithExtInfo(entity), entityStream);
}
public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entityWithExtInfo);
this(new AtlasEntitiesWithExtInfo(entityWithExtInfo), entityStream);
}
public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo, EntityStream entityStream) {
this.entitiesWithExtInfo = entitiesWithExtInfo;
this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
this.entityStream = entityStream;
}
......
......@@ -27,6 +27,7 @@ import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
......@@ -113,8 +114,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
public static final String CONSUMER_COMMIT_BATCH_SIZE = "atlas.notification.consumer.commit.batch.size";
public static final String CONSUMER_DISABLED = "atlas.notification.consumer.disabled";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
......@@ -131,6 +134,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final int failedMsgCacheSize;
private final int minWaitDuration;
private final int maxWaitDuration;
private final int commitBatchSize;
private final boolean skipHiveColumnLineageHive20633;
private final int skipHiveColumnLineageHive20633InputsThreshold;
private final int largeMessageProcessingTimeThresholdMs;
......@@ -166,10 +170,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default
maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
......@@ -500,7 +505,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
}
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
createOrUpdate(entities, false);
}
break;
......@@ -521,7 +526,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
createOrUpdate(entities, true);
}
break;
......@@ -554,7 +559,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
createOrUpdate(entities, false);
}
break;
......@@ -568,7 +573,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
}
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
createOrUpdate(entities, false);
}
break;
......@@ -597,7 +602,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
createOrUpdate(entities, false);
}
break;
......@@ -682,6 +687,32 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate) throws AtlasBaseException {
List<AtlasEntity> entitiesList = entities.getEntities();
AtlasEntityStream entityStream = new AtlasEntityStream(entities);
if (entitiesList.size() <= commitBatchSize) {
atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
} else {
for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
int toIndex = fromIdx + commitBatchSize;
if (toIndex > entitiesList.size()) {
toIndex = entitiesList.size();
}
AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(new ArrayList<>(entitiesList.subList(fromIdx, toIndex)));
AtlasEntityStream batchStream = new AtlasEntityStream(batch, entityStream);
atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
RequestContext.get().resetEntityGuidUpdates();
RequestContext.get().clearCache();
}
}
}
private void recordFailedMessages() {
//logging failed messages
for (String message : failedMessages) {
......@@ -767,6 +798,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (skipHiveColumnLineageHive20633) {
skipHiveColumnLineage(context);
}
context.moveRegisteredReferredEntities();
}
private void ignoreOrPruneHiveTables(PreprocessorContext context) {
......@@ -801,16 +834,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
}
for (String guid : context.getReferredEntitiesToMove()) {
AtlasEntity entity = referredEntities.remove(guid);
if (entity != null) {
entities.add(entity);
LOG.info("moved referred entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), context.getKafkaMessageOffset(), context.getKafkaPartition());
}
}
}
int ignoredEntities = context.getIgnoredEntities().size();
......
......@@ -173,6 +173,35 @@ public class PreprocessorContext {
collectGuids(obj, prunedEntities);
}
public void moveRegisteredReferredEntities() {
List<AtlasEntity> entities = getEntities();
Map<String, AtlasEntity> referredEntities = getReferredEntities();
if (entities != null && referredEntities != null && !referredEntitiesToMove.isEmpty()) {
AtlasEntity firstEntity = entities.isEmpty() ? null : entities.get(0);
for (String guid : referredEntitiesToMove) {
AtlasEntity entity = referredEntities.remove(guid);
if (entity != null) {
entities.add(entity);
if (LOG.isDebugEnabled()) {
LOG.debug("moved referred entity: typeName={}, qualifiedName={}. topic-offset={}, partition={}", entity.getTypeName(), EntityPreprocessor.getQualifiedName(entity), kafkaMessage.getOffset(), kafkaMessage.getPartition());
}
}
}
if (firstEntity != null) {
LOG.info("moved {} referred-entities to end of entities-list (firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}", referredEntitiesToMove.size(), firstEntity.getTypeName(), EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), kafkaMessage.getPartition());
} else {
LOG.info("moved {} referred-entities to entities-list. topic-offset={}, partition={}", referredEntitiesToMove.size(), kafkaMessage.getOffset(), kafkaMessage.getPartition());
}
referredEntitiesToMove.clear();
}
}
public String getGuid(Object obj) {
Object ret = null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment