Commit 892df242 by Mandar Ambawane Committed by nixonrodrigues

ATLAS-3583 Use Audit framework to generate audit entries for TypeDefs CREATE, UPDATE and DELETE

parent 41e1039b
......@@ -25,6 +25,18 @@
{
"ordinal": 4,
"value": "IMPORT_DELETE_REPL"
},
{
"ordinal": 5,
"value": "TYPE_DEF_CREATE"
},
{
"ordinal": 6,
"value": "TYPE_DEF_UPDATE"
},
{
"ordinal": 7,
"value": "TYPE_DEF_DELETE"
}
]
}
......
{
"patches": [
{
"id": "TYPEDEF_PATCH_0006_001",
"description": "Add additional operations in Atlas",
"action": "UPDATE_ENUMDEF",
"typeName": "atlas_operation",
"applyToVersion": "1.0",
"updateToVersion": "1.1",
"params": null,
"elementDefs": [
{
"ordinal": 5,
"value": "TYPE_DEF_CREATE"
},
{
"ordinal": 6,
"value": "TYPE_DEF_UPDATE"
},
{
"ordinal": 7,
"value": "TYPE_DEF_DELETE"
}
]
}
]
}
......@@ -40,7 +40,10 @@ public class AtlasAuditEntry extends AtlasBaseModelObject implements Serializabl
PURGE("PURGE"),
EXPORT("EXPORT"),
IMPORT("IMPORT"),
IMPORT_DELETE_REPL("IMPORT_DELETE_REPL");
IMPORT_DELETE_REPL("IMPORT_DELETE_REPL"),
TYPE_DEF_CREATE("TYPE_DEF_CREATE"),
TYPE_DEF_UPDATE("TYPE_DEF_UPDATE"),
TYPE_DEF_DELETE("TYPE_DEF_DELETE");
private final String type;
......
......@@ -46,7 +46,7 @@ import java.util.Set;
@AtlasService
public class AtlasAuditService {
private static final Logger LOG = LoggerFactory.getLogger(AtlasAuditService.class);
private static final String ENTITY_TYPE_AUDIT_ENTRY = "__AtlasAuditEntry";
public static final String ENTITY_TYPE_AUDIT_ENTRY = "__AtlasAuditEntry";
private final DataAccess dataAccess;
private final AtlasDiscoveryService discoveryService;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ChangedTypeDefs;
import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.audit.AtlasAuditEntry;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.utils.AtlasJson;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
@Component
@Order(2)
public class TypeDefAuditListener implements TypeDefChangeListener {
AtlasAuditService auditService;
@Inject
TypeDefAuditListener(AtlasAuditService auditService) {
this.auditService = auditService;
}
@Override
public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
createAuditEntry(changedTypeDefs);
}
@Override
public void onLoadCompletion() throws AtlasBaseException {
}
private void createAuditEntry(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException {
List<AtlasBaseTypeDef> createdTypes = (List<AtlasBaseTypeDef>) changedTypeDefs.getCreatedTypeDefs();
List<AtlasBaseTypeDef> updatedTypes = (List<AtlasBaseTypeDef>) changedTypeDefs.getUpdatedTypeDefs();
List<AtlasBaseTypeDef> deletedTypes = (List<AtlasBaseTypeDef>) changedTypeDefs.getDeletedTypeDefs();
updatedTypes = removeDuplicateEntries(createdTypes, updatedTypes);
createAuditEntry(createdTypes, AtlasAuditEntry.AuditOperation.TYPE_DEF_CREATE);
createAuditEntry(updatedTypes, AtlasAuditEntry.AuditOperation.TYPE_DEF_UPDATE);
createAuditEntry(deletedTypes, AtlasAuditEntry.AuditOperation.TYPE_DEF_DELETE);
}
private List<AtlasBaseTypeDef> removeDuplicateEntries(List<AtlasBaseTypeDef> createdTypes, List<AtlasBaseTypeDef> updatedTypes) {
if (CollectionUtils.isNotEmpty(createdTypes)) {
List<String> createdTypeNames = createdTypes.stream()
.map(obj -> obj.getName()).collect(Collectors.toList());
updatedTypes.removeIf(obj -> createdTypeNames.contains(obj.getName()));
}
if (CollectionUtils.isNotEmpty(updatedTypes)) {
Set<AtlasBaseTypeDef> baseTypeDefs = updatedTypes.stream()
.collect(Collectors.toCollection(() ->
new TreeSet<>(Comparator.comparing(AtlasBaseTypeDef::getName))));
updatedTypes = new ArrayList<>(baseTypeDefs);
}
return updatedTypes;
}
private void createAuditEntry(List<AtlasBaseTypeDef> baseTypeDefList, AtlasAuditEntry.AuditOperation auditOperation) throws AtlasBaseException {
if (CollectionUtils.isEmpty(baseTypeDefList)) {
return;
}
final String clientIp = RequestContext.get().getClientIPAddress();
final Date startTime = new Date(RequestContext.get().getRequestTime());
final Date endTime = new Date();
Map<TypeCategory, List<AtlasBaseTypeDef>> groupByCategoryMap =
baseTypeDefList.stream().collect(Collectors.groupingBy(AtlasBaseTypeDef::getCategory));
List<String> categories = new ArrayList<>();
for (TypeCategory category : groupByCategoryMap.keySet()) {
categories.add(category.name());
}
String typeDefJson = AtlasJson.toJson(groupByCategoryMap);
auditService.add(RequestContext.get().getUser() == null ? "" : RequestContext.get().getUser(), auditOperation,
clientIp != null ? clientIp : "", startTime, endTime, String.join(",", categories),
typeDefJson, baseTypeDefList.size());
}
}
......@@ -443,6 +443,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
Arrays.sort(typePatchFiles);
PatchHandler[] patchHandlers = new PatchHandler[] {
new UpdateEnumDefPatchHandler(typeDefStore, typeRegistry),
new AddAttributePatchHandler(typeDefStore, typeRegistry),
new UpdateAttributePatchHandler(typeDefStore, typeRegistry),
new RemoveLegacyRefAttributesPatchHandler(typeDefStore, typeRegistry),
......@@ -527,6 +528,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private String updateToVersion;
private Map<String, Object> params;
private List<AtlasAttributeDef> attributeDefs;
private List<AtlasEnumElementDef> elementDefs;
private Map<String, String> typeDefOptions;
private String serviceType;
private String attributeName;
......@@ -595,6 +597,14 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
this.attributeDefs = attributeDefs;
}
public List<AtlasEnumElementDef> getElementDefs() {
return elementDefs;
}
public void setElementDefs(List<AtlasEnumElementDef> elementDefs) {
this.elementDefs = elementDefs;
}
public Map<String, String> getTypeDefOptions() {
return typeDefOptions;
}
......@@ -661,13 +671,48 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
class UpdateEnumDefPatchHandler extends PatchHandler {
public UpdateEnumDefPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
super(typeDefStore, typeRegistry, new String[]{"UPDATE_ENUMDEF"});
}
@Override
public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException {
String typeName = patch.getTypeName();
AtlasBaseTypeDef typeDef = typeRegistry.getTypeDefByName(typeName);
PatchStatus ret;
if (typeDef == null) {
throw new AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), typeName);
}
if (isPatchApplicable(patch, typeDef)) {
if (typeDef.getClass().equals(AtlasEnumDef.class)) {
AtlasEnumDef updatedDef = new AtlasEnumDef((AtlasEnumDef) typeDef);
for (AtlasEnumElementDef elementDef : patch.getElementDefs()) {
updatedDef.addElement(elementDef);
}
updatedDef.setTypeVersion(patch.getUpdateToVersion());
typeDefStore.updateEnumDefByName(typeName, updatedDef);
ret = APPLIED;
} else {
throw new AtlasBaseException(AtlasErrorCode.PATCH_NOT_APPLICABLE_FOR_TYPE, patch.getAction(), typeDef.getClass().getSimpleName());
}
} else {
LOG.info("patch skipped: typeName={}; applyToVersion={}; updateToVersion={}",
patch.getTypeName(), patch.getApplyToVersion(), patch.getUpdateToVersion());
ret = SKIPPED;
}
return ret;
}
}
class AddAttributePatchHandler extends PatchHandler {
public AddAttributePatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
super(typeDefStore, typeRegistry, new String[] { "ADD_ATTRIBUTE" });
}
@Override
public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException {
@Override
public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException {
String typeName = patch.getTypeName();
AtlasBaseTypeDef typeDef = typeRegistry.getTypeDefByName(typeName);
PatchStatus ret;
......
......@@ -35,6 +35,7 @@ import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.repository.audit.AtlasAuditService;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
......@@ -59,6 +60,8 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
......@@ -402,10 +405,19 @@ public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
return listener.getClass().getSimpleName();
}
private static final Predicate<AtlasEntityHeader> PRED_IS_NOT_TYPE_AUDIT_ENTITY = obj -> !obj.getTypeName().equals(AtlasAuditService.ENTITY_TYPE_AUDIT_ENTRY);
private boolean skipAuditEntries(List<AtlasEntityHeader> entityHeaders) {
return CollectionUtils.isEmpty(entityHeaders) || !entityHeaders.stream().anyMatch(PRED_IS_NOT_TYPE_AUDIT_ENTITY);
}
private void notifyListeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityHeaders)) {
return;
}
if (skipAuditEntries(entityHeaders)) {
return;
}
MetricRecorder metric = RequestContext.get().startMetricRecord("notifyListeners");
......
......@@ -434,7 +434,7 @@
-->
<lst name="defaults">
<str name="defType">edismax</str>
<str name="qf">3hmt_t 35x_t f0l_t i6d_l 7f2d_t 7gn9_t 3oqt_s jr9_t 3rwl_t lc5_t mx1_t 7dhh_t iyt_l 3j7p_t 7klh_t 7hfp_t 7i85_t ohx_t 7bwl_l 7cp1_l</str>
<str name="qf">3k05_t 35x_t f0l_t i6d_l 7f2d_t 7gn9_t 3r45_s jr9_t 3u9x_t lc5_t mx1_t 7dhh_t iyt_l 3j7p_t 7klh_t 7hfp_t 7i85_t ohx_t 7bwl_l 7cp1_l</str>
<str name="hl.fl">*</str>
<bool name="hl.requireFieldMatch">true</bool>
<bool name="lowercaseOperators">true</bool>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment