Commit cb2421bc by Ashutosh Mestry

ATLAS-3020: Audit APIs for classification updates.

parent 96bdc802
...@@ -27,7 +27,9 @@ import org.apache.atlas.model.instance.AtlasClassification; ...@@ -27,7 +27,9 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications; import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
...@@ -72,6 +74,8 @@ public class AtlasClientV2 extends AtlasBaseClient { ...@@ -72,6 +74,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
// Relationships APIs // Relationships APIs
private static final String RELATIONSHIPS_URI = BASE_URI + "v2/relationship/"; private static final String RELATIONSHIPS_URI = BASE_URI + "v2/relationship/";
private static final String BULK_HEADERS = "bulk/headers";
private static final String BULK_SET_CLASSIFICATIONS = "bulk/setClassifications";
public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) { public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
super(baseUrl, basicAuthUserNamePassword); super(baseUrl, basicAuthUserNamePassword);
...@@ -326,13 +330,26 @@ public class AtlasClientV2 extends AtlasBaseClient { ...@@ -326,13 +330,26 @@ public class AtlasClientV2 extends AtlasBaseClient {
} }
public void deleteClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException { public void deleteClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException {
callAPI(formatPathParameters(API_V2.GET_CLASSIFICATIONS, guid), AtlasClassifications.class, classifications); for (AtlasClassification c : classifications) {
callAPI(formatPathParameters(API_V2.DELETE_CLASSIFICATION, guid, c.getTypeName()), AtlasClassifications.class, classifications);
}
} }
public void deleteClassification(String guid, String classificationName) throws AtlasServiceException { public void deleteClassification(String guid, String classificationName) throws AtlasServiceException {
callAPI(formatPathParameters(API_V2.DELETE_CLASSIFICATION, guid, classificationName), null, null); callAPI(formatPathParameters(API_V2.DELETE_CLASSIFICATION, guid, classificationName), null, null);
} }
public AtlasEntityHeaders getEntityHeaders(long tagUpdateStartTime) throws AtlasServiceException {
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
queryParams.add("tagUpdateStartTime", Long.toString(tagUpdateStartTime));
return callAPI(API_V2.GET_BULK_HEADERS, AtlasEntityHeaders.class, queryParams);
}
public String setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasServiceException {
return callAPI(API_V2.UPDATE_BULK_SET_CLASSIFICATIONS, String.class, entityHeaders);
}
/* Discovery calls */ /* Discovery calls */
public AtlasSearchResult dslSearch(final String query) throws AtlasServiceException { public AtlasSearchResult dslSearch(final String query) throws AtlasServiceException {
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl(); MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
...@@ -480,6 +497,8 @@ public class AtlasClientV2 extends AtlasBaseClient { ...@@ -480,6 +497,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
public static final API_V2 DELETE_RELATIONSHIP_BY_GUID = new API_V2(RELATIONSHIPS_URI + "guid/", HttpMethod.DELETE, Response.Status.NO_CONTENT); public static final API_V2 DELETE_RELATIONSHIP_BY_GUID = new API_V2(RELATIONSHIPS_URI + "guid/", HttpMethod.DELETE, Response.Status.NO_CONTENT);
public static final API_V2 CREATE_RELATIONSHIP = new API_V2(RELATIONSHIPS_URI , HttpMethod.POST, Response.Status.OK); public static final API_V2 CREATE_RELATIONSHIP = new API_V2(RELATIONSHIPS_URI , HttpMethod.POST, Response.Status.OK);
public static final API_V2 UPDATE_RELATIONSHIP = new API_V2(RELATIONSHIPS_URI , HttpMethod.PUT, Response.Status.OK); public static final API_V2 UPDATE_RELATIONSHIP = new API_V2(RELATIONSHIPS_URI , HttpMethod.PUT, Response.Status.OK);
public static final API_V2 GET_BULK_HEADERS = new API_V2(ENTITY_API + BULK_HEADERS, HttpMethod.GET, Response.Status.OK);
public static final API_V2 UPDATE_BULK_SET_CLASSIFICATIONS = new API_V2(ENTITY_API + AtlasClientV2.BULK_SET_CLASSIFICATIONS, HttpMethod.POST, Response.Status.OK);
private API_V2(String path, String method, Response.Status status) { private API_V2(String path, String method, Response.Status status) {
super(path, method, status); super(path, method, status);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.instance;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.Map;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasEntityHeaders {
Map<String, AtlasEntityHeader> guidHeaderMap;
public AtlasEntityHeaders() {
}
public AtlasEntityHeaders(Map<String, AtlasEntityHeader> guidEntityHeaderMap) {
guidHeaderMap = guidEntityHeaderMap;
}
public void setGuidHeaderMap(Map<String, AtlasEntityHeader> guidHeaderMap) {
this.guidHeaderMap = guidHeaderMap;
}
public Map<String, AtlasEntityHeader> getGuidHeaderMap() {
return guidHeaderMap;
}
}
...@@ -30,6 +30,7 @@ import org.apache.atlas.EntityAuditEvent; ...@@ -30,6 +30,7 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -39,6 +40,7 @@ import java.util.ArrayList; ...@@ -39,6 +40,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import javax.inject.Singleton; import javax.inject.Singleton;
...@@ -188,6 +190,11 @@ public class CassandraBasedAuditRepository extends AbstractStorageBasedAuditRepo ...@@ -188,6 +190,11 @@ public class CassandraBasedAuditRepository extends AbstractStorageBasedAuditRepo
} }
@Override @Override
public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
throw new NotImplementedException();
}
@Override
public void start() throws AtlasException { public void start() throws AtlasException {
initApplicationProperties(); initApplicationProperties();
initializeSettings(); initializeSettings();
......
...@@ -24,6 +24,7 @@ import org.apache.atlas.model.audit.EntityAuditEventV2; ...@@ -24,6 +24,7 @@ import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* Interface for repository for storing entity audit events * Interface for repository for storing entity audit events
...@@ -77,6 +78,14 @@ public interface EntityAuditRepository { ...@@ -77,6 +78,14 @@ public interface EntityAuditRepository {
*/ */
List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException; List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException;
/***
* List events for given time range where classifications have been added, deleted or updated.
* @param fromTimestamp from timestamp
* @param toTimestamp to timestamp
* @return events that match the range
* @throws AtlasBaseException
*/
Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException;
/** /**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
......
...@@ -50,7 +50,10 @@ import org.apache.hadoop.hbase.client.Result; ...@@ -50,7 +50,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
...@@ -64,9 +67,10 @@ import javax.inject.Singleton; ...@@ -64,9 +67,10 @@ import javax.inject.Singleton;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE;
...@@ -546,6 +550,52 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito ...@@ -546,6 +550,52 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
} }
@Override @Override
public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
final String classificationUpdatesAction = "CLASSIFICATION_";
if (LOG.isDebugEnabled()) {
LOG.debug("Listing events for fromTimestamp {}, toTimestamp {}, action {}", fromTimestamp, toTimestamp);
}
Table table = null;
ResultScanner scanner = null;
try {
Set<String> guids = new HashSet<>();
table = connection.getTable(tableName);
byte[] filterValue = Bytes.toBytes(classificationUpdatesAction);
BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator(filterValue);
SingleColumnValueFilter filter = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
Scan scan = new Scan().setFilter(filter).setTimeRange(fromTimestamp, toTimestamp);
Result result;
scanner = table.getScanner(scan);
while ((result = scanner.next()) != null) {
EntityAuditEvent event = fromKey(result.getRow());
if (event == null) {
continue;
}
guids.add(event.getEntityId());
}
return guids;
} catch (IOException e) {
throw new AtlasBaseException(e);
} finally {
try {
close(scanner);
close(table);
} catch (AtlasException e) {
throw new AtlasBaseException(e);
}
}
}
@Override
public void start() throws AtlasException { public void start() throws AtlasException {
Configuration configuration = ApplicationProperties.get(); Configuration configuration = ApplicationProperties.get();
startInternal(configuration, getHBaseConfiguration(configuration)); startInternal(configuration, getHBaseConfiguration(configuration));
......
...@@ -21,6 +21,7 @@ package org.apache.atlas.repository.audit; ...@@ -21,6 +21,7 @@ package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -28,7 +29,9 @@ import org.springframework.stereotype.Component; ...@@ -28,7 +29,9 @@ import org.springframework.stereotype.Component;
import javax.inject.Singleton; import javax.inject.Singleton;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
...@@ -119,6 +122,20 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository { ...@@ -119,6 +122,20 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
} }
@Override @Override
public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
Set<String> events = new HashSet<>();
for (EntityAuditEventV2 event : auditEventsV2.values()) {
long timestamp = event.getTimestamp();
if (timestamp > fromTimestamp && timestamp <= toTimestamp) {
events.add(event.getEntityId());
}
}
return events;
}
@Override
public List<Object> listEvents(String entityId, String startKey, short maxResults) { public List<Object> listEvents(String entityId, String startKey, short maxResults) {
List events = listEventsV2(entityId, startKey, maxResults); List events = listEventsV2(entityId, startKey, maxResults);
......
...@@ -20,12 +20,14 @@ package org.apache.atlas.repository.audit; ...@@ -20,12 +20,14 @@ package org.apache.atlas.repository.audit;
import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.inject.Singleton; import javax.inject.Singleton;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* Implementation that completely disables the audit repository. * Implementation that completely disables the audit repository.
...@@ -66,6 +68,11 @@ public class NoopEntityAuditRepository implements EntityAuditRepository { ...@@ -66,6 +68,11 @@ public class NoopEntityAuditRepository implements EntityAuditRepository {
} }
@Override @Override
public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
return Collections.emptySet();
}
@Override
public List<Object> listEvents(String entityId, String startKey, short n) { public List<Object> listEvents(String entityId, String startKey, short n) {
return Collections.emptyList(); return Collections.emptyList();
} }
......
...@@ -24,6 +24,7 @@ import org.apache.atlas.model.instance.AtlasClassification; ...@@ -24,6 +24,7 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v2.EntityStream; import org.apache.atlas.repository.store.graph.v2.EntityStream;
...@@ -216,4 +217,6 @@ public interface AtlasEntityStore { ...@@ -216,4 +217,6 @@ public interface AtlasEntityStore {
List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException; List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException;
AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException; AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException;
String setClassifications(AtlasEntityHeaders entityHeaders);
} }
...@@ -657,6 +657,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -657,6 +657,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
return ret; return ret;
} }
@Override
@GraphTransaction
public String setClassifications(AtlasEntityHeaders entityHeaders) {
ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(typeRegistry, this);
return associator.setClassifications(entityHeaders.getGuidHeaderMap());
}
private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException { private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("==> createOrUpdate()"); LOG.debug("==> createOrUpdate()");
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@Component
public class ClassificationAssociator {
private static final Logger LOG = LoggerFactory.getLogger(ClassificationAssociator.class);
public static class Retriever {
private final EntityAuditRepository auditRepository;
private final EntityGraphRetriever entityRetriever;
public Retriever(AtlasTypeRegistry typeRegistry, EntityAuditRepository auditRepository) {
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.auditRepository = auditRepository;
}
Retriever(EntityGraphRetriever entityGraphRetriever, EntityAuditRepository auditRepository) {
this.entityRetriever = entityGraphRetriever;
this.auditRepository = auditRepository;
}
public AtlasEntityHeaders get(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
toTimestamp = incrementTimestamp(toTimestamp);
Set<String> guids = auditRepository.getEntitiesWithTagChanges(fromTimestamp, toTimestamp);
Map<String, AtlasEntityHeader> guidEntityHeaderMap = new HashMap<>();
for (String guid : guids) {
AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
guidEntityHeaderMap.put(guid, entityHeader);
}
guids.clear();
return new AtlasEntityHeaders(guidEntityHeaderMap);
}
private long incrementTimestamp(long t) {
return t + 1;
}
}
public static class Updater {
static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName";
static final String STATUS_DONE = "(Done)";
static final String STATUS_SKIPPED = "(Skipped)";
static final String STATUS_PARTIAL = "(Partial)";
private static final String PROCESS_FORMAT = "%s:%s:%s:%s -> %s:%s";
static final String PROCESS_ADD = "Add";
static final String PROCESS_UPDATE = "Update";
static final String PROCESS_DELETE = "Delete";
static final String JSONIFY_STRING_FORMAT = "\"%s\",";
private final AtlasTypeRegistry typeRegistry;
private final AtlasEntityStore entitiesStore;
private final EntityGraphRetriever entityRetriever;
private StringBuilder actionSummary = new StringBuilder();
public Updater(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
this.typeRegistry = typeRegistry;
this.entitiesStore = entitiesStore;
entityRetriever = new EntityGraphRetriever(typeRegistry);
}
public String setClassifications(Map<String, AtlasEntityHeader> map) {
for (AtlasEntityHeader incomingEntityHeader : map.values()) {
String typeName = incomingEntityHeader.getTypeName();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
LOG.warn("Entity type: {}: Not found: {}!", typeName, STATUS_SKIPPED);
summarizeFormat("%s: %s", typeName, STATUS_SKIPPED);
continue;
}
String qualifiedName = getQualifiedName(incomingEntityHeader);
AtlasEntityHeader entityToBeChanged = getByUniqueAttributes(entityType, qualifiedName, incomingEntityHeader.getAttributes());
if (entityToBeChanged == null) {
summarizeFormat("Entity:%s:%s:[Not found]:%s", entityType.getTypeName(), qualifiedName, STATUS_SKIPPED);
continue;
}
String guid = entityToBeChanged.getGuid();
Map<String, List<AtlasClassification>> operationListMap = computeChanges(incomingEntityHeader, entityToBeChanged);
commitChanges(guid, typeName, qualifiedName, operationListMap);
}
return getJsonArray(actionSummary);
}
private void commitChanges(String entityGuid, String typeName, String qualifiedName,
Map<String, List<AtlasClassification>> operationListMap) {
if (MapUtils.isEmpty(operationListMap)) {
return;
}
deleteClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_DELETE));
updateClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_UPDATE));
addClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_ADD));
operationListMap.clear();
}
private Map<String, List<AtlasClassification>> computeChanges(AtlasEntityHeader incomingEntityHeader, AtlasEntityHeader entityToBeUpdated) {
if (incomingEntityHeader == null || entityToBeUpdated == null) {
return null;
}
ListOps<AtlasClassification> listOps = new ListOps<>();
List<AtlasClassification> incomingClassifications = listOps.filter(incomingEntityHeader.getGuid(), incomingEntityHeader.getClassifications());
List<AtlasClassification> entityClassifications = listOps.filter(entityToBeUpdated.getGuid(), entityToBeUpdated.getClassifications());
if (CollectionUtils.isEmpty(incomingClassifications) && CollectionUtils.isEmpty(entityClassifications)) {
return null;
}
Map<String, List<AtlasClassification>> operationListMap = new HashMap<>();
bucket(PROCESS_DELETE, operationListMap, listOps.subtract(entityClassifications, incomingClassifications));
bucket(PROCESS_UPDATE, operationListMap, listOps.intersect(incomingClassifications, entityClassifications));
bucket(PROCESS_ADD, operationListMap, listOps.subtract(incomingClassifications, entityClassifications));
return operationListMap;
}
private void bucket(String op, Map<String, List<AtlasClassification>> operationListMap, List<AtlasClassification> results) {
if (CollectionUtils.isEmpty(results)) {
return;
}
operationListMap.put(op, results);
}
private void addClassifications(String entityToBeChangedGuid, String typeName, String qualifiedName, List<AtlasClassification> list) {
if (CollectionUtils.isEmpty(list)) {
return;
}
String status = STATUS_DONE;
String classificationNames = getClassificationNames(list);
try {
entitiesStore.addClassifications(entityToBeChangedGuid, list);
} catch (AtlasBaseException e) {
status = STATUS_PARTIAL;
LOG.warn("{}:{}:{} -> {}: {}.", PROCESS_UPDATE, typeName, qualifiedName, classificationNames, status);
}
summarize(PROCESS_ADD, entityToBeChangedGuid, typeName, qualifiedName, classificationNames, status);
}
private void updateClassifications(String entityToBeChangedGuid, String typeName, String qualifiedName, List<AtlasClassification> list) {
if (CollectionUtils.isEmpty(list)) {
return;
}
String status = STATUS_DONE;
String classificationNames = getClassificationNames(list);
try {
entitiesStore.updateClassifications(entityToBeChangedGuid, list);
} catch (AtlasBaseException e) {
status = STATUS_PARTIAL;
LOG.warn("{}:{}:{} -> {}: {}.", PROCESS_UPDATE, typeName, qualifiedName, classificationNames, status);
}
summarize(PROCESS_UPDATE, entityToBeChangedGuid, typeName, qualifiedName, classificationNames, status);
}
private void deleteClassifications(String typeName, String entityGuid, String qualifiedName, List<AtlasClassification> list) {
if (CollectionUtils.isEmpty(list)) {
return;
}
String status = STATUS_DONE;
String classificationTypeName = getClassificationNames(list);
for (AtlasClassification c : list) {
try {
entitiesStore.deleteClassification(entityGuid, c.getTypeName());
} catch (AtlasBaseException e) {
status = STATUS_PARTIAL;
LOG.warn("{}:{}:{} -> {}: Skipped!", entityGuid, typeName, qualifiedName, c.getTypeName());
}
}
summarize(PROCESS_DELETE, entityGuid, typeName, qualifiedName, classificationTypeName, status);
}
AtlasEntityHeader getByUniqueAttributes(AtlasEntityType entityType, String qualifiedName, Map<String, Object> attrValues) {
try {
AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, attrValues);
if (vertex == null) {
return null;
}
return entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
} catch (AtlasBaseException e) {
LOG.warn("{}:{} could not be processed!", entityType, qualifiedName);
return null;
} catch (Exception ex) {
LOG.error("{}:{} could not be processed!", entityType, qualifiedName, ex);
return null;
}
}
private String getClassificationNames(List<AtlasClassification> list) {
return list.stream().map(AtlasClassification::getTypeName).collect(Collectors.joining(", "));
}
private String getQualifiedName(AtlasEntityHeader entityHeader) {
return (String) entityHeader.getAttribute(ATTR_NAME_QUALIFIED_NAME);
}
private void summarize(String... s) {
summarizeFormat(PROCESS_FORMAT, s);
}
private void summarizeFormat(String format, String... s) {
summarize(String.format(format, s));
}
private void summarize(String s) {
actionSummary.append(String.format(JSONIFY_STRING_FORMAT, s));
}
private String getJsonArray(StringBuilder actionSummary) {
return "[" + StringUtils.removeEnd(actionSummary.toString(), ",") + "]";
}
}
private static class ListOps<V extends AtlasClassification> {
public List<V> intersect(List<V> lhs, List<V> rhs) {
if (CollectionUtils.isEmpty(rhs)) {
return null;
}
List<V> result = new ArrayList<>();
for (V c : rhs) {
V found = findFrom(lhs, c);
if (found != null) {
result.add(found);
}
}
return result;
}
public List<V> subtract(List<V> lhs, List<V> rhs) {
if (CollectionUtils.isEmpty(lhs)) {
return null;
}
List<V> result = new ArrayList<>();
for (V c : lhs) {
V found = findFrom(rhs, c);
if (found == null) {
result.add(c);
}
}
return result;
}
private V findFrom(List<V> reference, V check) {
return (V) CollectionUtils.find(reference, ox ->
((V) ox).getTypeName().equals(check.getTypeName()));
}
public List<V> filter(String guid, List<V> list) {
if (CollectionUtils.isEmpty(list)) {
return list;
}
return list.stream().filter(x -> x.getEntityGuid().equals(guid)).collect(Collectors.toList());
}
}
}
...@@ -95,7 +95,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation ...@@ -95,7 +95,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
@Component @Component
public final class EntityGraphRetriever { public class EntityGraphRetriever {
private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class); private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class);
private static final String TERM_RELATION_NAME = "AtlasGlossarySemanticAssignment"; private static final String TERM_RELATION_NAME = "AtlasGlossarySemanticAssignment";
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.apache.commons.lang.StringUtils;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_ADD;
import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_DELETE;
import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_UPDATE;
import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.STATUS_DONE;
import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.STATUS_SKIPPED;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.FileAssert.fail;
public class ClassificationAssociatorTest {
private static final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8";
private static String TEST_FILES_SUBDIR = "classification-association";
private static String MESSAGE_SEPARATOR = ":";
private static String ENTITY_NAME_SEPARATOR = "->";
private class ClassificationAssociatorUpdaterForSpy extends ClassificationAssociator.Updater {
private final String entityFileName;
public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
super(typeRegistry, entitiesStore);
this.entityFileName = StringUtils.EMPTY;
}
public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore, String entityFileName) {
super(typeRegistry, entitiesStore);
this.entityFileName = entityFileName;
}
@Override
AtlasEntityHeader getByUniqueAttributes(AtlasEntityType entityType, String qualifiedName, Map<String, Object> attrValues) {
try {
if (StringUtils.isEmpty(entityFileName)) {
return null;
}
return getEntityHeaderFromFile(entityFileName);
} catch (IOException e) {
fail(entityFileName + " could not be loaded.");
return null;
}
}
}
@Test
public void auditScanYieldsNothing_EmptyHeadersReturned() {
AtlasEntityHeaders actualEntityHeaders = setupRetriever("header-empty", 0, 0, null);
assertNotNull(actualEntityHeaders);
assertEquals(actualEntityHeaders.getGuidHeaderMap().size(),0);
}
@Test
public void auditScanYieldsOneEntity_EntityHeadersHasOneElementWithClassification() {
AtlasEntityHeaders actualEntityHeaders = setupRetriever("header-Tx", 0, 0, TABLE_GUID);
assertNotNull(actualEntityHeaders);
assertEquals(actualEntityHeaders.getGuidHeaderMap().size(), 1);
assertTrue(actualEntityHeaders.getGuidHeaderMap().containsKey(TABLE_GUID));
assertEquals(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getGuid(), TABLE_GUID);
assertNotNull(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getClassifications());
assertEquals(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getClassifications().size(), 1);
}
private AtlasEntityHeaders setupRetriever(String headersFile, int fromTimestamp, int toTimestamp, final String tableGuid) {
AtlasEntityHeader entityHeaderWithClassification = null;
try {
Set<String> guids = new HashSet<String>();
entityHeaderWithClassification = TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, headersFile, AtlasEntityHeader.class);
if (!StringUtils.isEmpty(tableGuid)) {
guids.add(tableGuid);
}
EntityAuditRepository auditRepository = mock(EntityAuditRepository.class);
when(auditRepository.getEntitiesWithTagChanges(anyLong(), anyLong())).thenReturn(guids);
EntityGraphRetriever entityGraphRetriever = mock(EntityGraphRetriever.class);
when(entityGraphRetriever.toAtlasEntityHeaderWithClassifications(TABLE_GUID)).thenReturn(entityHeaderWithClassification);
ClassificationAssociator.Retriever retriever = new ClassificationAssociator.Retriever(entityGraphRetriever, auditRepository);
return retriever.get(fromTimestamp, toTimestamp);
}
catch (Exception ex) {
fail("Exception!");
return null;
}
}
@Test
public void updaterIncorrectType_ReturnsError() throws IOException {
AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile("header-PII");
AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(null);
ClassificationAssociator.Updater updater = new ClassificationAssociator.Updater(typeRegistry, entitiesStore);
String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
assertTrue(summary.contains("hive_"));
assertTrue(summary.contains(STATUS_SKIPPED));
}
@Test
public void updaterCorrectTypeEntityNotFound_Skipped() throws IOException {
AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile("header-PII");
AtlasEntityType hiveTable = mock(AtlasEntityType.class);
AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable);
when(hiveTable.getTypeName()).thenReturn("hive_column");
ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore);
String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
TypeReference<String[]> typeReference = new TypeReference<String[]>() {};
String[] summaryArray = AtlasJson.fromJson(summary, typeReference);
assertEquals(summaryArray.length, 1);
assertSummaryElement(summaryArray[0], "Entity", STATUS_SKIPPED, "");
}
@Test
public void updaterTests() throws IOException {
updaterAssert("header-PII", "col-entity-None", PROCESS_ADD + ":PII");
updaterAssert("header-PII", "col-entity-PII", new String[]{PROCESS_UPDATE + ":PII"});
updaterAssert("header-None", "col-entity-PII", new String[]{PROCESS_DELETE + ":PII"});
updaterAssert("header-PII-VENDOR_PII", "col-entity-PII-FIN_PII",
PROCESS_DELETE + ":FIN_PII",
PROCESS_UPDATE + ":PII",
PROCESS_ADD + ":VENDOR_PII");
updaterAssert("header-None", "col-entity-None", new String[]{});
updaterAssert("header-FIN_PII", "col-entity-PII",
PROCESS_DELETE + ":PII",
PROCESS_ADD + ":FIN_PII");
}
@Test
public void updater_filterPropagatedClassifications() throws IOException {
updaterAssert("header-Tx-prop-T1", "col-entity-T1-prop-Tn",
PROCESS_DELETE + ":T1",
PROCESS_ADD + ":Tx");
}
private void assertSummaryElement(String summaryElement, String operation, String status, String classificationName) {
String[] splits = StringUtils.split(summaryElement, MESSAGE_SEPARATOR);
String[] nameSplits = StringUtils.split(splits[3], ENTITY_NAME_SEPARATOR);
if (nameSplits.length > 1) {
assertEquals(nameSplits[1].trim(), classificationName);
}
assertEquals(splits[0], operation);
assertEquals(splits[4], status);
}
private String[] setupUpdater(String entityHeaderFileName, String entityFileName, int expectedSummaryLength) throws IOException {
AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile(entityHeaderFileName);
AtlasEntityType hiveTable = mock(AtlasEntityType.class);
AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable);
when(hiveTable.getTypeName()).thenReturn("hive_column");
ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore, entityFileName);
String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
TypeReference<String[]> typeReference = new TypeReference<String[]>() {};
String[] summaryArray = AtlasJson.fromJson(summary, typeReference);
assertEquals(summaryArray.length, expectedSummaryLength);
return summaryArray;
}
private AtlasEntityHeader getEntityHeaderFromFile(String entityJson) throws IOException {
return TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, entityJson, AtlasEntityHeader.class);
}
private AtlasEntityHeaders getEntityHeaderMapFromFile(String filename) throws IOException {
return TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, filename, AtlasEntityHeaders.class);
}
private void updaterAssert(String incoming, String entity, String... opNamePair) throws IOException {
String[] summary = setupUpdater(incoming, entity, opNamePair.length);
for (int i = 0; i < opNamePair.length; i++) {
String s = opNamePair[i];
String[] splits = StringUtils.split(s, ":");
assertSummaryElement(summary[i], splits[0], STATUS_DONE, splits[1]);
}
}
}
{
"typeName": "hive_column",
"attributes": {
"owner": "hive",
"qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
"name": "nationalid"
},
"guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"status": "ACTIVE"
}
{
"typeName": "hive_column",
"attributes": {
"owner": "hive",
"qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
"name": "nationalid"
},
"guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"status": "ACTIVE",
"classifications": [
{
"typeName": "PII",
"attributes": {
"type": "ssn"
},
"entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"entityStatus": "ACTIVE",
"propagate": true,
"removePropagationsOnEntityDelete": false
},
{
"typeName": "FIN_PII",
"attributes": {
"type": "ssn"
},
"entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"entityStatus": "ACTIVE",
"propagate": true,
"removePropagationsOnEntityDelete": false
}
]
}
{
"typeName": "hive_column",
"attributes": {
"owner": "hive",
"qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
"name": "nationalid"
},
"guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"status": "ACTIVE",
"classifications": [
{
"typeName": "PII",
"attributes": {
"type": "ssn"
},
"entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"entityStatus": "ACTIVE",
"propagate": true,
"removePropagationsOnEntityDelete": false
}
]
}
{
"typeName": "hive_column",
"attributes": {
"owner": "hive",
"createTime": 1547071410000,
"qualifiedName": "stocks.daily@cl1",
"name": "daily"
},
"guid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
"status": "ACTIVE",
"displayText": "daily",
"classifications": [
{
"typeName": "T1",
"attributes": {},
"entityGuid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
"entityStatus": "ACTIVE",
"propagate": false,
"validityPeriods": [],
"removePropagationsOnEntityDelete": false
},
{
"typeName": "Tn",
"attributes": {},
"entityGuid": "22222222-5555-40f8-a30f-3090b8a622f8",
"entityStatus": "ACTIVE",
"propagate": false,
"validityPeriods": [],
"removePropagationsOnEntityDelete": false
}
],
"meaningNames": [],
"meanings": []
}
{
"guidHeaderMap": {
"0ce68113-77fe-4ed1-9585-69371202bd74": {
"typeName": "hive_column",
"attributes": {
"owner": "hive",
"qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
"name": "nationalid"
},
"guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"status": "ACTIVE",
"displayText": "nationalid",
"classificationNames": [
"FIN_PII"
],
"classifications": [
{
"typeName": "FIN_PII",
"attributes": {
"type": "ssn"
},
"entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"entityStatus": "ACTIVE",
"propagate": true,
"removePropagationsOnEntityDelete": false
}
],
"meaningNames": [],
"meanings": []
}
}
}
{
"guidHeaderMap": {
"0ce68113-77fe-4ed1-9585-69371202bd74": {
"typeName": "hive_column",
"attributes": {
"owner": "hive",
"qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
"name": "nationalid"
},
"guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"status": "ACTIVE",
"displayText": "nationalid",
"classificationNames": [
],
"classifications": [
],
"meaningNames": [],
"meanings": []
}
}
}
{
"guidHeaderMap": {
"0ce68113-77fe-4ed1-9585-69371202bd74": {
"typeName": "hive_column",
"attributes": {
"owner": "hive",
"qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
"name": "nationalid"
},
"guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"status": "ACTIVE",
"displayText": "nationalid",
"classificationNames": [
"PII"
],
"classifications": [
{
"typeName": "PII",
"attributes": {
"type": "ssn"
},
"entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"entityStatus": "ACTIVE",
"propagate": true,
"removePropagationsOnEntityDelete": false
},
{
"typeName": "VENDOR_PII",
"attributes": {
"type": "ssn"
},
"entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"entityStatus": "ACTIVE",
"propagate": true,
"removePropagationsOnEntityDelete": false
}
],
"meaningNames": [],
"meanings": []
}
}
}
{
"guidHeaderMap": {
"0ce68113-77fe-4ed1-9585-69371202bd74": {
"typeName": "hive_column",
"attributes": {
"owner": "hive",
"qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
"name": "nationalid"
},
"guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"status": "ACTIVE",
"displayText": "nationalid",
"classificationNames": [
"PII"
],
"classifications": [
{
"typeName": "PII",
"attributes": {
"type": "ssn"
},
"entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"entityStatus": "ACTIVE",
"propagate": true,
"removePropagationsOnEntityDelete": false
}
],
"meaningNames": [],
"meanings": []
}
}
}
{
"guidHeaderMap": {
"0ce68113-77fe-4ed1-9585-69371202bd74": {
"typeName": "hive_column",
"attributes": {
"owner": "hive",
"qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
"name": "nationalid"
},
"guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"status": "ACTIVE",
"displayText": "nationalid",
"classificationNames": [
"T1", "Tx"
],
"classifications": [
{
"typeName": "Tx",
"attributes": {
"type": "ssn"
},
"entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
"entityStatus": "ACTIVE",
"propagate": true,
"removePropagationsOnEntityDelete": false
},
{
"typeName": "T1",
"attributes": {
"type": "ssn"
},
"entityGuid": "22222222-77fe-4ed1-9585-69371202bd74",
"entityStatus": "ACTIVE",
"propagate": true,
"removePropagationsOnEntityDelete": false
}
],
"meaningNames": [],
"meanings": []
}
}
}
{
"typeName": "hive_table",
"attributes": {
"owner": "hive",
"createTime": 1547071410000,
"qualifiedName": "stocks.daily@cl1",
"name": "daily"
},
"guid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
"status": "ACTIVE",
"displayText": "daily",
"classificationNames": [],
"classifications": [
{
"typeName": "Tx",
"attributes": {},
"entityGuid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
"entityStatus": "ACTIVE",
"propagate": false,
"validityPeriods": [],
"removePropagationsOnEntityDelete": false
}
],
"meaningNames": [],
"meanings": []
}
\ No newline at end of file
...@@ -26,10 +26,12 @@ import org.apache.atlas.model.instance.AtlasClassification; ...@@ -26,10 +26,12 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.ClassificationAssociateRequest; import org.apache.atlas.model.instance.ClassificationAssociateRequest;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.store.graph.v2.ClassificationAssociator;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
...@@ -49,7 +51,16 @@ import org.springframework.stereotype.Service; ...@@ -49,7 +51,16 @@ import org.springframework.stereotype.Service;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.*; import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -77,7 +88,6 @@ public class EntityREST { ...@@ -77,7 +88,6 @@ public class EntityREST {
private final EntityAuditRepository auditRepository; private final EntityAuditRepository auditRepository;
private final AtlasInstanceConverter instanceConverter; private final AtlasInstanceConverter instanceConverter;
@Inject @Inject
public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore, public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore,
EntityAuditRepository auditRepository, AtlasInstanceConverter instanceConverter) { EntityAuditRepository auditRepository, AtlasInstanceConverter instanceConverter) {
...@@ -601,7 +611,7 @@ public class EntityREST { ...@@ -601,7 +611,7 @@ public class EntityREST {
} }
/** /**
* Bulk API to create new entities or update existing entities in Atlas. * Bulk API to create new entities or updates existing entities in Atlas.
* Existing entity is matched using its unique guid if supplied or by its unique attributes eg: qualifiedName * Existing entity is matched using its unique guid if supplied or by its unique attributes eg: qualifiedName
*/ */
@POST @POST
...@@ -708,6 +718,49 @@ public class EntityREST { ...@@ -708,6 +718,49 @@ public class EntityREST {
} }
} }
@GET
@Path("bulk/headers")
@Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasEntityHeaders getEntityHeaders(@QueryParam("tagUpdateStartTime") long tagUpdateStartTime) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
long tagUpdateEndTime = System.currentTimeMillis();
if (tagUpdateStartTime > tagUpdateEndTime) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "fromTimestamp should be less than toTimestamp");
}
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.getEntityHeaders(" + tagUpdateStartTime + ", " + tagUpdateEndTime + ")");
}
ClassificationAssociator.Retriever associator = new ClassificationAssociator.Retriever(typeRegistry, auditRepository);
return associator.get(tagUpdateStartTime, tagUpdateEndTime);
} finally {
AtlasPerfTracer.log(perf);
}
}
@POST
@Path("bulk/setClassifications")
@Produces(Servlets.JSON_MEDIA_TYPE)
@Consumes(Servlets.JSON_MEDIA_TYPE)
public String setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.setClassifications()");
}
ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(typeRegistry, entitiesStore);
return associator.setClassifications(entityHeaders.getGuidHeaderMap());
} finally {
AtlasPerfTracer.log(perf);
}
}
private AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException { private AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException {
AtlasEntityType ret = typeRegistry.getEntityTypeByName(typeName); AtlasEntityType ret = typeRegistry.getEntityTypeByName(typeName);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment