diff --git a/pom.xml b/pom.xml index b2506e7..eb2689f 100644 --- a/pom.xml +++ b/pom.xml @@ -698,6 +698,7 @@ <commons-conf.version>1.10</commons-conf.version> <commons-conf2.version>2.2</commons-conf2.version> <commons-collections.version>3.2.2</commons-collections.version> + <commons-collections4.version>4.4</commons-collections4.version> <commons-logging.version>1.1.3</commons-logging.version> <commons-lang.version>2.6</commons-lang.version> <commons-validator.version>1.6</commons-validator.version> @@ -1095,6 +1096,12 @@ <version>${commons-collections.version}</version> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + <version>${commons-collections4.version}</version> + </dependency> + <!--Javax inject--> <dependency> <groupId>javax.inject</groupId> diff --git a/webapp/pom.xml b/webapp/pom.xml index 57cab62..3c55b4d 100755 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -229,6 +229,11 @@ </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + </dependency> + + <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> </dependency> diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 41a6c2e..14cae42 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -64,12 +64,19 @@ import org.apache.atlas.web.filters.AuditFilter.AuditLog; import org.apache.atlas.web.service.ServiceState; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.collections4.map.PassiveExpiringMap; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.DependsOn; import org.springframework.core.annotation.Order; +import org.springframework.security.authentication.UsernamePasswordAuthenticationToken; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.GrantedAuthority; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.security.core.userdetails.User; +import org.springframework.security.core.userdetails.UserDetails; import org.springframework.stereotype.Component; import javax.inject.Inject; @@ -90,6 +97,7 @@ import java.util.regex.Pattern; import static org.apache.atlas.model.instance.AtlasObjectId.*; import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS; +import static org.apache.atlas.web.security.AtlasAbstractAuthenticationProvider.getAuthoritiesFromUGI; /** @@ -140,6 +148,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes"; public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs"; public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs"; + public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user"; + public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS = "atlas.notification.authorize.authn.cache.ttl.seconds"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; @@ -167,6 +177,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final boolean rdbmsTypesRemoveOwnedRefAttrs; private final boolean preprocessEnabled; private final boolean createShellEntityForNonExistingReference; + private final boolean authorizeUsingMessageUser; + private final Map<String, Authentication> authnCache; + private final NotificationInterface notificationInterface; private final Configuration applicationProperties; private ExecutorService executors; @@ -202,6 +215,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl consumerDisabled = applicationProperties.getBoolean(CONSUMER_DISABLED, false); largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean(); + authorizeUsingMessageUser = applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false); + + int authnCacheTtlSeconds = applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300); + + authnCache = (authorizeUsingMessageUser && authnCacheTtlSeconds > 0) ? new PassiveExpiringMap<>(authnCacheTtlSeconds * 1000) : null; String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN); String[] patternHiveTablesToPrune = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN); @@ -552,6 +570,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl NotificationStat stats = new NotificationStat(); AuditLog auditLog = null; + if (authorizeUsingMessageUser) { + setCurrentUser(messageUser); + } + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name()); } @@ -1225,6 +1247,41 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } + private void setCurrentUser(String userName) { + Authentication authentication = getAuthenticationForUser(userName); + + if (LOG.isDebugEnabled()) { + if (authentication != null) { + LOG.debug("setCurrentUser(): notification processing will be authorized as user '{}'", userName); + } else { + LOG.debug("setCurrentUser(): Failed to get authentication for user '{}'.", userName); + } + } + + SecurityContextHolder.getContext().setAuthentication(authentication); + } + + private Authentication getAuthenticationForUser(String userName) { + Authentication ret = null; + + if (StringUtils.isNotBlank(userName)) { + ret = authnCache != null ? authnCache.get(userName) : null; + + if (ret == null) { + List<GrantedAuthority> grantedAuths = getAuthoritiesFromUGI(userName); + UserDetails principal = new User(userName, "", grantedAuths); + + ret = new UsernamePasswordAuthenticationToken(principal, ""); + + if (authnCache != null) { + authnCache.put(userName, ret); + } + } + } + + return ret; + } + static class FailedCommitOffsetRecorder { private Long currentOffset;