Commit 48df6544 by arempter Committed by Madhan Neethiraj

ATLAS-3261: added option to authorize notifications using username given in the message

parent d9ebc242
......@@ -698,6 +698,7 @@
<commons-conf.version>1.10</commons-conf.version>
<commons-conf2.version>2.2</commons-conf2.version>
<commons-collections.version>3.2.2</commons-collections.version>
<commons-collections4.version>4.4</commons-collections4.version>
<commons-logging.version>1.1.3</commons-logging.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-validator.version>1.6</commons-validator.version>
......@@ -1095,6 +1096,12 @@
<version>${commons-collections.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>
<!--Javax inject-->
<dependency>
<groupId>javax.inject</groupId>
......
......@@ -229,6 +229,11 @@
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>
......
......@@ -64,12 +64,19 @@ import org.apache.atlas.web.filters.AuditFilter.AuditLog;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
......@@ -90,6 +97,7 @@ import java.util.regex.Pattern;
import static org.apache.atlas.model.instance.AtlasObjectId.*;
import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS;
import static org.apache.atlas.web.security.AtlasAbstractAuthenticationProvider.getAuthoritiesFromUGI;
/**
......@@ -140,6 +148,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user";
public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS = "atlas.notification.authorize.authn.cache.ttl.seconds";
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
......@@ -167,6 +177,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled;
private final boolean createShellEntityForNonExistingReference;
private final boolean authorizeUsingMessageUser;
private final Map<String, Authentication> authnCache;
private final NotificationInterface notificationInterface;
private final Configuration applicationProperties;
private ExecutorService executors;
......@@ -202,6 +215,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
authorizeUsingMessageUser = applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false);
int authnCacheTtlSeconds = applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300);
authnCache = (authorizeUsingMessageUser && authnCacheTtlSeconds > 0) ? new PassiveExpiringMap<>(authnCacheTtlSeconds * 1000) : null;
String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
String[] patternHiveTablesToPrune = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
......@@ -552,6 +570,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
NotificationStat stats = new NotificationStat();
AuditLog auditLog = null;
if (authorizeUsingMessageUser) {
setCurrentUser(messageUser);
}
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
}
......@@ -1225,6 +1247,41 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
private void setCurrentUser(String userName) {
Authentication authentication = getAuthenticationForUser(userName);
if (LOG.isDebugEnabled()) {
if (authentication != null) {
LOG.debug("setCurrentUser(): notification processing will be authorized as user '{}'", userName);
} else {
LOG.debug("setCurrentUser(): Failed to get authentication for user '{}'.", userName);
}
}
SecurityContextHolder.getContext().setAuthentication(authentication);
}
private Authentication getAuthenticationForUser(String userName) {
Authentication ret = null;
if (StringUtils.isNotBlank(userName)) {
ret = authnCache != null ? authnCache.get(userName) : null;
if (ret == null) {
List<GrantedAuthority> grantedAuths = getAuthoritiesFromUGI(userName);
UserDetails principal = new User(userName, "", grantedAuths);
ret = new UsernamePasswordAuthenticationToken(principal, "");
if (authnCache != null) {
authnCache.put(userName, ret);
}
}
}
return ret;
}
static class FailedCommitOffsetRecorder {
private Long currentOffset;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment