Commit 03f2754d by Ashutosh Mestry

ATLAS-2814: Cluster stores replication details.

parent 561cdc91
...@@ -37,6 +37,7 @@ import com.sun.jersey.multipart.MultiPart; ...@@ -37,6 +37,7 @@ import com.sun.jersey.multipart.MultiPart;
import com.sun.jersey.multipart.file.FileDataBodyPart; import com.sun.jersey.multipart.file.FileDataBodyPart;
import com.sun.jersey.multipart.file.StreamDataBodyPart; import com.sun.jersey.multipart.file.StreamDataBodyPart;
import com.sun.jersey.multipart.impl.MultiPartWriter; import com.sun.jersey.multipart.impl.MultiPartWriter;
import org.apache.atlas.model.impexp.AtlasCluster;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
...@@ -79,7 +80,7 @@ public abstract class AtlasBaseClient { ...@@ -79,7 +80,7 @@ public abstract class AtlasBaseClient {
public static final String ADMIN_METRICS = "admin/metrics"; public static final String ADMIN_METRICS = "admin/metrics";
public static final String ADMIN_IMPORT = "admin/import"; public static final String ADMIN_IMPORT = "admin/import";
public static final String ADMIN_EXPORT = "admin/export"; public static final String ADMIN_EXPORT = "admin/export";
public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled"; public static final String ADMIN_CLUSTER_TEMPLATE = "%sadmin/cluster/%s";
public static final String QUERY = "query"; public static final String QUERY = "query";
public static final String LIMIT = "limit"; public static final String LIMIT = "limit";
...@@ -519,6 +520,11 @@ public abstract class AtlasBaseClient { ...@@ -519,6 +520,11 @@ public abstract class AtlasBaseClient {
return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE); return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
} }
public AtlasCluster getCluster(String clusterName) throws AtlasServiceException {
API api = new API(String.format(ADMIN_CLUSTER_TEMPLATE, BASE_URI, clusterName), HttpMethod.GET, Response.Status.OK);
return callAPI(api, AtlasCluster.class, null);
}
boolean isRetryableException(ClientHandlerException che) { boolean isRetryableException(ClientHandlerException che) {
return che.getCause().getClass().equals(IOException.class) return che.getCause().getClass().equals(IOException.class)
|| che.getCause().getClass().equals(ConnectException.class); || che.getCause().getClass().equals(ConnectException.class);
......
...@@ -15,12 +15,13 @@ ...@@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.model.clusterinfo; package org.apache.atlas.model.impexp;
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.AtlasBaseModelObject; import org.apache.atlas.model.AtlasBaseModelObject;
import org.apache.atlas.type.AtlasType;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -37,9 +38,7 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ ...@@ -37,9 +38,7 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
public class AtlasCluster extends AtlasBaseModelObject implements Serializable { public class AtlasCluster extends AtlasBaseModelObject implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public static final String SYNC_INFO_KEY = "syncInfo"; public static final String KEY_REPLICATION_DETAILS = "REPL_DETAILS";
public static final String OPERATION = "operation";
public static final String NEXT_MODIFIED_TIMESTAMP = "nextModifiedTimestamp";
private String name; private String name;
private String qualifiedName; private String qualifiedName;
...@@ -48,9 +47,11 @@ public class AtlasCluster extends AtlasBaseModelObject implements Serializable { ...@@ -48,9 +47,11 @@ public class AtlasCluster extends AtlasBaseModelObject implements Serializable {
public AtlasCluster() { public AtlasCluster() {
urls = new ArrayList<>(); urls = new ArrayList<>();
additionalInfo = new HashMap<>();
} }
public AtlasCluster(String name, String qualifiedName) { public AtlasCluster(String name, String qualifiedName) {
this();
this.name = name; this.name = name;
this.qualifiedName = qualifiedName; this.qualifiedName = qualifiedName;
} }
...@@ -64,21 +65,59 @@ public class AtlasCluster extends AtlasBaseModelObject implements Serializable { ...@@ -64,21 +65,59 @@ public class AtlasCluster extends AtlasBaseModelObject implements Serializable {
} }
public void setAdditionalInfo(Map<String, String> additionalInfo) { public void setAdditionalInfo(Map<String, String> additionalInfo) {
if(this.additionalInfo == null) {
this.additionalInfo = new HashMap<>();
}
this.additionalInfo = additionalInfo; this.additionalInfo = additionalInfo;
} }
public void setAdditionalInfo(String key, String value) { public void setAdditionalInfo(String key, String value) {
if(this.additionalInfo == null) { if(additionalInfo == null) {
this.additionalInfo = new HashMap<>(); additionalInfo = new HashMap<>();
} }
additionalInfo.put(key, value); additionalInfo.put(key, value);
} }
public void setAdditionalInfoRepl(String guid, long modifiedTimestamp) {
Map<String, Object> replicationDetailsMap = null;
if(additionalInfo != null && additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) {
replicationDetailsMap = AtlasType.fromJson(getAdditionalInfo().get(KEY_REPLICATION_DETAILS), Map.class);
}
if(replicationDetailsMap == null) {
replicationDetailsMap = new HashMap<>();
}
if(modifiedTimestamp == 0) {
replicationDetailsMap.remove(guid);
} else {
replicationDetailsMap.put(guid, modifiedTimestamp);
}
updateReplicationMap(replicationDetailsMap);
}
private void updateReplicationMap(Map<String, Object> replicationDetailsMap) {
String json = AtlasType.toJson(replicationDetailsMap);
setAdditionalInfo(KEY_REPLICATION_DETAILS, json);
}
public Object getAdditionalInfoRepl(String guid) {
if(additionalInfo == null || !additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) {
return null;
}
String key = guid;
String mapJson = additionalInfo.get(KEY_REPLICATION_DETAILS);
Map<String, String> replicationDetailsMap = AtlasType.fromJson(mapJson, Map.class);
if(!replicationDetailsMap.containsKey(key)) {
return null;
}
return replicationDetailsMap.get(key);
}
public Map<String, String> getAdditionalInfo() { public Map<String, String> getAdditionalInfo() {
return this.additionalInfo; return this.additionalInfo;
} }
......
...@@ -173,6 +173,22 @@ public class AtlasExportResult implements Serializable { ...@@ -173,6 +173,22 @@ public class AtlasExportResult implements Serializable {
metrics.put(key, currentValue + incrementBy); metrics.put(key, currentValue + incrementBy);
} }
public AtlasExportResult shallowCopy() {
AtlasExportResult result = new AtlasExportResult();
result.setRequest(getRequest());
result.setUserName(getUserName());
result.setClientIpAddress(getClientIpAddress());
result.setHostName(getHostName());
result.setTimeStamp(getTimeStamp());
result.setMetrics(getMetrics());
result.setOperationStatus(getOperationStatus());
result.setSourceClusterName(getSourceClusterName());
result.setLastModifiedTimestamp(getLastModifiedTimestamp());
return result;
}
public StringBuilder toString(StringBuilder sb) { public StringBuilder toString(StringBuilder sb) {
if (sb == null) { if (sb == null) {
sb = new StringBuilder(); sb = new StringBuilder();
......
...@@ -56,6 +56,7 @@ public class AtlasImportResult { ...@@ -56,6 +56,7 @@ public class AtlasImportResult {
private Map<String, Integer> metrics; private Map<String, Integer> metrics;
private List<String> processedEntities; private List<String> processedEntities;
private OperationStatus operationStatus; private OperationStatus operationStatus;
private AtlasExportResult exportResultWithoutData;
public AtlasImportResult() { public AtlasImportResult() {
this(null, null, null, null, System.currentTimeMillis()); this(null, null, null, null, System.currentTimeMillis());
...@@ -143,6 +144,14 @@ public class AtlasImportResult { ...@@ -143,6 +144,14 @@ public class AtlasImportResult {
public List<String> getProcessedEntities() { return this.processedEntities; } public List<String> getProcessedEntities() { return this.processedEntities; }
public AtlasExportResult getExportResult() {
return exportResultWithoutData;
}
public void setExportResult(AtlasExportResult exportResult) {
this.exportResultWithoutData = exportResult.shallowCopy();
}
public StringBuilder toString(StringBuilder sb) { public StringBuilder toString(StringBuilder sb) {
if (sb == null) { if (sb == null) {
sb = new StringBuilder(); sb = new StringBuilder();
......
...@@ -22,19 +22,24 @@ import org.apache.atlas.ApplicationProperties; ...@@ -22,19 +22,24 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster; import org.apache.atlas.model.impexp.AtlasCluster;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.ExportImportAuditEntry; import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -55,7 +60,9 @@ public class AuditsWriter { ...@@ -55,7 +60,9 @@ public class AuditsWriter {
this.auditService = auditService; this.auditService = auditService;
} }
public void write(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entityCreationOrder) throws AtlasBaseException { public void write(String userName, AtlasExportResult result,
long startTime, long endTime,
List<String> entityCreationOrder) throws AtlasBaseException {
auditForExport.add(userName, result, startTime, endTime, entityCreationOrder); auditForExport.add(userName, result, startTime, endTime, entityCreationOrder);
} }
...@@ -67,15 +74,17 @@ public class AuditsWriter { ...@@ -67,15 +74,17 @@ public class AuditsWriter {
return options.containsKey(replicatedKey); return options.containsKey(replicatedKey);
} }
private void updateReplicationAttribute(boolean isReplicationSet, String clusterName, private void updateReplicationAttribute(boolean isReplicationSet,
String clusterName,
List<String> exportedGuids, List<String> exportedGuids,
String attrNameReplicated) throws AtlasBaseException { String attrNameReplicated,
if (!isReplicationSet) { long lastModifiedTimestamp) throws AtlasBaseException {
if (!isReplicationSet || CollectionUtils.isEmpty(exportedGuids)) {
return; return;
} }
AtlasCluster cluster = saveCluster(clusterName); AtlasCluster cluster = saveCluster(clusterName, exportedGuids.get(0), lastModifiedTimestamp);
clusterService.updateEntityWithCluster(cluster, exportedGuids, attrNameReplicated); clusterService.updateEntitiesWithCluster(cluster, exportedGuids, attrNameReplicated);
} }
private String getClusterNameFromOptions(Map options, String key) { private String getClusterNameFromOptions(Map options, String key) {
...@@ -84,27 +93,14 @@ public class AuditsWriter { ...@@ -84,27 +93,14 @@ public class AuditsWriter {
: ""; : "";
} }
private void addAuditEntry(String userName, String sourceCluster, String targetCluster, String operation, private AtlasCluster saveCluster(String clusterName) throws AtlasBaseException {
String result, long startTime, long endTime, boolean hasData) throws AtlasBaseException { AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
if(!hasData) return; return clusterService.save(cluster);
ExportImportAuditEntry entry = new ExportImportAuditEntry();
entry.setUserName(userName);
entry.setSourceClusterName(sourceCluster);
entry.setTargetClusterName(targetCluster);
entry.setOperation(operation);
entry.setResultSummary(result);
entry.setStartTime(startTime);
entry.setEndTime(endTime);
auditService.save(entry);
LOG.info("addAuditEntry: user: {}, source: {}, target: {}, operation: {}", entry.getUserName(),
entry.getSourceClusterName(), entry.getTargetClusterName(), entry.getOperation());
} }
private AtlasCluster saveCluster(String clusterName) throws AtlasBaseException { private AtlasCluster saveCluster(String clusterName, String entityGuid, long lastModifiedTimestamp) throws AtlasBaseException {
AtlasCluster cluster = new AtlasCluster(clusterName, clusterName); AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
cluster.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp);
return clusterService.save(cluster); return clusterService.save(cluster);
} }
...@@ -128,22 +124,25 @@ public class AuditsWriter { ...@@ -128,22 +124,25 @@ public class AuditsWriter {
public void add(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException { public void add(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException {
optionKeyReplicatedTo = AtlasExportRequest.OPTION_KEY_REPLICATED_TO; optionKeyReplicatedTo = AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
request = result.getRequest(); request = result.getRequest();
cluster = saveCluster(getCurrentClusterName());
replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedTo); replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedTo);
targetClusterName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo); targetClusterName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo);
addAuditEntry(userName, cluster = saveCluster(getCurrentClusterName());
cluster.getName(), targetClusterName,
auditService.add(userName, getCurrentClusterName(), targetClusterName,
ExportImportAuditEntry.OPERATION_EXPORT, ExportImportAuditEntry.OPERATION_EXPORT,
AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty());
updateReplicationAttributeForExport(entitityGuids, request); updateReplicationAttributeForExport(request, entitityGuids);
} }
private void updateReplicationAttributeForExport(List<String> entityGuids, AtlasExportRequest request) throws AtlasBaseException { private void updateReplicationAttributeForExport(AtlasExportRequest request, List<String> entityGuids) throws AtlasBaseException {
if(!replicationOptionState) return; if(!replicationOptionState) {
return;
}
updateReplicationAttribute(replicationOptionState, targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER); updateReplicationAttribute(replicationOptionState, targetClusterName,
entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, 0L);
} }
} }
...@@ -159,12 +158,13 @@ public class AuditsWriter { ...@@ -159,12 +158,13 @@ public class AuditsWriter {
request = result.getRequest(); request = result.getRequest();
optionKeyReplicatedFrom = AtlasImportRequest.OPTION_KEY_REPLICATED_FROM; optionKeyReplicatedFrom = AtlasImportRequest.OPTION_KEY_REPLICATED_FROM;
replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedFrom); replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedFrom);
cluster = saveCluster(getClusterNameFromOptionsState());
String sourceCluster = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom); String sourceCluster = getClusterNameFromOptionsState();
addAuditEntry(userName, cluster = saveCluster(sourceCluster);
sourceCluster, cluster.getName(),
ExportImportAuditEntry.OPERATION_EXPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); auditService.add(userName,
sourceCluster, getCurrentClusterName(),
ExportImportAuditEntry.OPERATION_IMPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty());
updateReplicationAttributeForImport(entitityGuids); updateReplicationAttributeForImport(entitityGuids);
} }
...@@ -173,13 +173,17 @@ public class AuditsWriter { ...@@ -173,13 +173,17 @@ public class AuditsWriter {
if(!replicationOptionState) return; if(!replicationOptionState) return;
String targetClusterName = cluster.getName(); String targetClusterName = cluster.getName();
updateReplicationAttribute(replicationOptionState, targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER);
updateReplicationAttribute(replicationOptionState, targetClusterName,
entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER,
result.getExportResult().getLastModifiedTimestamp());
} }
private String getClusterNameFromOptionsState() { private String getClusterNameFromOptionsState() {
return replicationOptionState return replicationOptionState
? getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom) ? getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom)
: getCurrentClusterName(); : "";
} }
} }
} }
...@@ -21,12 +21,18 @@ package org.apache.atlas.repository.impexp; ...@@ -21,12 +21,18 @@ package org.apache.atlas.repository.impexp;
import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster; import org.apache.atlas.model.impexp.AtlasCluster;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.ogm.DataAccess;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -41,21 +47,24 @@ public class ClusterService { ...@@ -41,21 +47,24 @@ public class ClusterService {
private final DataAccess dataAccess; private final DataAccess dataAccess;
private final AtlasEntityStore entityStore; private final AtlasEntityStore entityStore;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityGraphRetriever;
@Inject @Inject
public ClusterService(DataAccess dataAccess, AtlasEntityStore entityStore) { public ClusterService(DataAccess dataAccess, AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever) {
this.dataAccess = dataAccess; this.dataAccess = dataAccess;
this.entityStore = entityStore; this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = entityGraphRetriever;
} }
public AtlasCluster get(AtlasCluster cluster) { public AtlasCluster get(AtlasCluster cluster) throws AtlasBaseException {
try { try {
return dataAccess.load(cluster); return dataAccess.load(cluster);
} catch (AtlasBaseException e) { } catch (AtlasBaseException e) {
LOG.error("dataAccess", e); LOG.error("dataAccess", e);
throw e;
} }
return null;
} }
@GraphTransaction @GraphTransaction
...@@ -68,14 +77,15 @@ public class ClusterService { ...@@ -68,14 +77,15 @@ public class ClusterService {
} }
@GraphTransaction @GraphTransaction
public void updateEntityWithCluster(AtlasCluster cluster, List<String> guids, String attributeName) throws AtlasBaseException { public void updateEntitiesWithCluster(AtlasCluster cluster, List<String> entityGuids, String attributeName) throws AtlasBaseException {
if(cluster != null && StringUtils.isEmpty(cluster.getGuid())) return; if (cluster != null && StringUtils.isEmpty(cluster.getGuid())) {
return;
}
AtlasObjectId objectId = getObjectId(cluster); AtlasObjectId objectId = getObjectId(cluster);
for (String guid : guids) { for (String guid : entityGuids) {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(guid); AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(guid);
updateAttribute(entityWithExtInfo, attributeName, objectId); updateAttribute(entityWithExtInfo, attributeName, objectId);
entityStore.createOrUpdate(new AtlasEntityStream(entityWithExtInfo), true);
} }
} }
...@@ -88,33 +98,46 @@ public class ClusterService { ...@@ -88,33 +98,46 @@ public class ClusterService {
* Attribute passed by name is updated with the value passed. * Attribute passed by name is updated with the value passed.
* @param entityWithExtInfo Entity to be updated * @param entityWithExtInfo Entity to be updated
* @param propertyName attribute name * @param propertyName attribute name
* @param value Value to be set for attribute * @param objectId Value to be set for attribute
*/ */
private void updateAttribute(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, String propertyName, Object value) { private void updateAttribute(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo,
String propertyName,
AtlasObjectId objectId) {
String value = EntityGraphMapper.getSoftRefFormattedValue(objectId);
updateAttribute(entityWithExtInfo.getEntity(), propertyName, value); updateAttribute(entityWithExtInfo.getEntity(), propertyName, value);
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
updateAttribute(e, propertyName, value); updateAttribute(e, propertyName, value);
} }
} }
private void updateAttribute(AtlasEntity e, String propertyName, Object value) { private void updateAttribute(AtlasEntity entity, String attributeName, Object value) {
if(e.hasAttribute(propertyName) == false) return; if(entity.hasAttribute(attributeName) == false) return;
Object oVal = e.getAttribute(propertyName);
if (oVal != null && !(oVal instanceof List)) return;
List list;
if (oVal == null) { try {
list = new ArrayList(); AtlasVertex vertex = entityGraphRetriever.getEntityVertex(entity.getGuid());
} else { if(vertex == null) {
list = (List) oVal; return;
}
String qualifiedFieldName = getVertexPropertyName(entity, attributeName);
List list = vertex.getListProperty(qualifiedFieldName);
if (CollectionUtils.isEmpty(list)) {
list = new ArrayList();
}
if (!list.contains(value)) {
list.add(value);
vertex.setListProperty(qualifiedFieldName, list);
}
} }
catch (AtlasBaseException ex) {
if (!list.contains(value)) { LOG.error("error retrieving vertex from guid: {}", entity.getGuid(), ex);
list.add(value);
} }
}
e.setAttribute(propertyName, list); private String getVertexPropertyName(AtlasEntity entity, String attributeName) throws AtlasBaseException {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
AtlasStructType.AtlasAttribute attribute = type.getAttribute(attributeName);
return attribute.getVertexPropertyName();
} }
} }
...@@ -25,14 +25,19 @@ import org.apache.atlas.exception.AtlasBaseException; ...@@ -25,14 +25,19 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters; import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.impexp.ExportImportAuditEntry; import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.ogm.DataAccess;
import org.apache.atlas.repository.ogm.ExportImportAuditEntryDTO; import org.apache.atlas.repository.ogm.ExportImportAuditEntryDTO;
import org.apache.cassandra.cql3.statements.Restriction;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@AtlasService @AtlasService
public class ExportImportAuditService { public class ExportImportAuditService {
...@@ -59,17 +64,43 @@ public class ExportImportAuditService { ...@@ -59,17 +64,43 @@ public class ExportImportAuditService {
return dataAccess.load(entry); return dataAccess.load(entry);
} }
public AtlasSearchResult get(String userName, String operation, String sourceCluster, String targetCluster, public List<ExportImportAuditEntry> get(String userName, String operation, String cluster,
String startTime, String endTime, String startTime, String endTime,
int limit, int offset) throws AtlasBaseException { int limit, int offset) throws AtlasBaseException {
SearchParameters.FilterCriteria criteria = new SearchParameters.FilterCriteria(); SearchParameters.FilterCriteria criteria = new SearchParameters.FilterCriteria();
criteria.setCriterion(new ArrayList<SearchParameters.FilterCriteria>()); criteria.setCondition(SearchParameters.FilterCriteria.Condition.AND);
criteria.setCriterion(new ArrayList<>());
addSearchParameters(criteria, userName, operation, sourceCluster, targetCluster, startTime, endTime); addSearchParameters(criteria, userName, operation, cluster, startTime, endTime);
SearchParameters searchParameters = getSearchParameters(limit, offset, criteria); SearchParameters searchParameters = getSearchParameters(limit, offset, criteria);
searchParameters.setAttributes(getAuditEntityAttributes());
return discoveryService.searchWithParameters(searchParameters); AtlasSearchResult result = discoveryService.searchWithParameters(searchParameters);
return toExportImportAuditEntry(result);
}
private Set<String> getAuditEntityAttributes() {
return ExportImportAuditEntryDTO.getAttributes();
}
private List<ExportImportAuditEntry> toExportImportAuditEntry(AtlasSearchResult result) {
List<ExportImportAuditEntry> ret = new ArrayList<>();
if(CollectionUtils.isEmpty(result.getEntities())) {
return ret;
}
for (AtlasEntityHeader entityHeader : result.getEntities()) {
ExportImportAuditEntry entry = ExportImportAuditEntryDTO.from(entityHeader.getGuid(),
entityHeader.getAttributes());
if(entry == null) {
continue;
}
ret.add(entry);
}
return ret;
} }
private SearchParameters getSearchParameters(int limit, int offset, SearchParameters.FilterCriteria criteria) { private SearchParameters getSearchParameters(int limit, int offset, SearchParameters.FilterCriteria criteria) {
...@@ -78,46 +109,64 @@ public class ExportImportAuditService { ...@@ -78,46 +109,64 @@ public class ExportImportAuditService {
searchParameters.setEntityFilters(criteria); searchParameters.setEntityFilters(criteria);
searchParameters.setLimit(limit); searchParameters.setLimit(limit);
searchParameters.setOffset(offset); searchParameters.setOffset(offset);
return searchParameters; return searchParameters;
} }
private void addSearchParameters(SearchParameters.FilterCriteria criteria, private void addSearchParameters(SearchParameters.FilterCriteria criteria, String userName, String operation,
String userName, String operation, String sourceCluster, String targetCluster, String cluster, String startTime, String endTime) {
String startTime, String endTime) {
addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_USER_NAME, userName); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_USER_NAME, userName);
addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_OPERATION, operation); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_OPERATION, operation);
addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_SOURCE_CLUSTER_NAME, sourceCluster);
addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_TARGET_CLUSTER_NAME, targetCluster);
addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_START_TIME, startTime); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_START_TIME, startTime);
addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_END_TIME, endTime); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_END_TIME, endTime);
addClusterFilterCriteria(criteria, cluster);
} }
private void addParameterIfValueNotEmpty(SearchParameters.FilterCriteria criteria, private void addClusterFilterCriteria(SearchParameters.FilterCriteria parentCriteria, String cluster) {
String attributeName, String value) { if (StringUtils.isEmpty(cluster)) {
if(StringUtils.isEmpty(value)) return; return;
}
boolean isFirstCriteria = criteria.getAttributeName() == null; SearchParameters.FilterCriteria criteria = new SearchParameters.FilterCriteria();
SearchParameters.FilterCriteria cx = isFirstCriteria criteria.setCondition(SearchParameters.FilterCriteria.Condition.OR);
? criteria criteria.setCriterion(new ArrayList<>());
: new SearchParameters.FilterCriteria();
setCriteria(cx, attributeName, value); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_SOURCE_CLUSTER_NAME, cluster);
addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_TARGET_CLUSTER_NAME, cluster);
if(isFirstCriteria) { parentCriteria.getCriterion().add(criteria);
cx.setCondition(SearchParameters.FilterCriteria.Condition.AND); }
}
if(!isFirstCriteria) { private void addParameterIfValueNotEmpty(SearchParameters.FilterCriteria criteria, String attributeName, String value) {
criteria.getCriterion().add(cx); if(StringUtils.isEmpty(value)) {
return;
} }
SearchParameters.FilterCriteria filterCriteria = new SearchParameters.FilterCriteria();
filterCriteria.setAttributeName(attributeName);
filterCriteria.setAttributeValue(value);
filterCriteria.setOperator(SearchParameters.Operator.EQ);
criteria.getCriterion().add(filterCriteria);
} }
private SearchParameters.FilterCriteria setCriteria(SearchParameters.FilterCriteria criteria, String attributeName, String value) { public void add(String userName, String sourceCluster, String targetCluster, String operation,
criteria.setAttributeName(attributeName); String result, long startTime, long endTime, boolean hasData) throws AtlasBaseException {
criteria.setAttributeValue(value); if(!hasData) return;
criteria.setOperator(SearchParameters.Operator.EQ);
ExportImportAuditEntry entry = new ExportImportAuditEntry();
entry.setUserName(userName);
entry.setSourceClusterName(sourceCluster);
entry.setTargetClusterName(targetCluster);
entry.setOperation(operation);
entry.setResultSummary(result);
entry.setStartTime(startTime);
entry.setEndTime(endTime);
return criteria; save(entry);
LOG.info("addAuditEntry: user: {}, source: {}, target: {}, operation: {}", entry.getUserName(),
entry.getSourceClusterName(), entry.getTargetClusterName(), entry.getOperation());
} }
} }
...@@ -125,11 +125,11 @@ public class ExportService { ...@@ -125,11 +125,11 @@ public class ExportService {
context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed); context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef()); context.sink.setTypesDef(context.result.getData().getTypesDef());
auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder()); context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp);
clearContextData(context);
context.result.setOperationStatus(getOverallOperationStatus(statuses)); context.result.setOperationStatus(getOverallOperationStatus(statuses));
context.result.incrementMeticsCounter("duration", duration); context.result.incrementMeticsCounter("duration", duration);
context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp); auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder());
clearContextData(context);
context.sink.setResult(context.result); context.sink.setResult(context.result);
} }
...@@ -194,9 +194,7 @@ public class ExportService { ...@@ -194,9 +194,7 @@ public class ExportService {
} }
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) { private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
if (LOG.isDebugEnabled()) { debugLog("==> processObjectId({})", item);
LOG.debug("==> processObjectId({})", item);
}
try { try {
List<String> entityGuids = getStartingEntity(item, context); List<String> entityGuids = getStartingEntity(item, context);
...@@ -225,11 +223,16 @@ public class ExportService { ...@@ -225,11 +223,16 @@ public class ExportService {
return AtlasExportResult.OperationStatus.FAIL; return AtlasExportResult.OperationStatus.FAIL;
} }
if (LOG.isDebugEnabled()) { debugLog("<== processObjectId({})", item);
LOG.debug("<== processObjectId({})", item); return AtlasExportResult.OperationStatus.SUCCESS;
}
private void debugLog(String s, Object... params) {
if (!LOG.isDebugEnabled()) {
return;
} }
return AtlasExportResult.OperationStatus.SUCCESS; LOG.debug(s, params);
} }
private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
...@@ -330,9 +333,7 @@ public class ExportService { ...@@ -330,9 +333,7 @@ public class ExportService {
} }
private void processEntity(String guid, ExportContext context) throws AtlasBaseException { private void processEntity(String guid, ExportContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { debugLog("==> processEntity({})", guid);
LOG.debug("==> processEntity({})", guid);
}
if (!context.guidsProcessed.contains(guid)) { if (!context.guidsProcessed.contains(guid)) {
TraversalDirection direction = context.guidDirection.get(guid); TraversalDirection direction = context.guidDirection.get(guid);
...@@ -358,9 +359,7 @@ public class ExportService { ...@@ -358,9 +359,7 @@ public class ExportService {
} }
} }
if (LOG.isDebugEnabled()) { debugLog("<== processEntity({})", guid);
LOG.debug("<== processEntity({})", guid);
}
} }
private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
...@@ -403,8 +402,8 @@ public class ExportService { ...@@ -403,8 +402,8 @@ public class ExportService {
for (TraversalDirection direction : directions) { for (TraversalDirection direction : directions) {
String query = getQueryForTraversalDirection(direction); String query = getQueryForTraversalDirection(direction);
if (LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
} }
context.bindings.clear(); context.bindings.clear();
...@@ -433,8 +432,8 @@ public class ExportService { ...@@ -433,8 +432,8 @@ public class ExportService {
} }
} }
if (LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
} }
} }
} }
...@@ -451,8 +450,8 @@ public class ExportService { ...@@ -451,8 +450,8 @@ public class ExportService {
} }
private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
if (LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
} }
String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
...@@ -477,8 +476,9 @@ public class ExportService { ...@@ -477,8 +476,9 @@ public class ExportService {
} }
} }
if (LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}",
entity.getGuid(), result.size(), context.guidsToProcess.size());
} }
} }
......
...@@ -26,6 +26,7 @@ import org.apache.atlas.model.typedef.AtlasTypesDef; ...@@ -26,6 +26,7 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
...@@ -111,6 +112,16 @@ public class ImportService { ...@@ -111,6 +112,16 @@ public class ImportService {
updateTransformsWithSubTypes(importTransform); updateTransformsWithSubTypes(importTransform);
source.setImportTransform(importTransform); source.setImportTransform(importTransform);
if(LOG.isDebugEnabled()) {
debugLog(" => transforms: {}", AtlasType.toJson(importTransform));
}
}
private void debugLog(String s, Object... params) {
if(!LOG.isDebugEnabled()) return;
LOG.debug(s, params);
} }
private void updateTransformsWithSubTypes(ImportTransforms importTransforms) throws AtlasBaseException { private void updateTransformsWithSubTypes(ImportTransforms importTransforms) throws AtlasBaseException {
...@@ -189,6 +200,7 @@ public class ImportService { ...@@ -189,6 +200,7 @@ public class ImportService {
endTimestamp = System.currentTimeMillis(); endTimestamp = System.currentTimeMillis();
result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp)); result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp));
result.setExportResult(importSource.getExportResult());
auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder()); auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder());
} }
......
...@@ -19,12 +19,25 @@ package org.apache.atlas.repository.impexp; ...@@ -19,12 +19,25 @@ package org.apache.atlas.repository.impexp;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.List;
public abstract class ImportTransformer { public abstract class ImportTransformer {
private static final String TRANSFORMER_PARAMETER_SEPARATOR = "\\:"; private static final String TRANSFORMER_PARAMETER_SEPARATOR = "\\:";
private static final String TRANSFORMER_NAME_ADD = "add";
private static final String TRANSFORMER_NAME_CLEAR_ATTR = "clearAttrValue";
private static final String TRANSFORMER_NAME_LOWERCASE = "lowercase";
private static final String TRANSFORMER_NAME_UPPERCASE = "uppercase";
private static final String TRANSFORMER_NAME_REMOVE_CLASSIFICATION = "removeClassification";
private static final String TRANSFORMER_NAME_REPLACE = "replace";
private static final String TRANSFORMER_SET_DELETED = "setDeleted";
private final String transformType; private final String transformType;
...@@ -36,15 +49,26 @@ public abstract class ImportTransformer { ...@@ -36,15 +49,26 @@ public abstract class ImportTransformer {
if (StringUtils.isEmpty(key)) { if (StringUtils.isEmpty(key)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Invalid transformer-specification: {}.", transformerSpec); throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Invalid transformer-specification: {}.", transformerSpec);
} else if (key.equals("replace")) { } else if (key.equals(TRANSFORMER_NAME_REPLACE)) {
String toFindStr = (params == null || params.length < 2) ? "" : params[1]; String toFindStr = (params == null || params.length < 2) ? "" : params[1];
String replaceStr = (params == null || params.length < 3) ? "" : params[2]; String replaceStr = (params == null || params.length < 3) ? "" : params[2];
ret = new Replace(toFindStr, replaceStr); ret = new Replace(toFindStr, replaceStr);
} else if (key.equals("lowercase")) { } else if (key.equals(TRANSFORMER_NAME_LOWERCASE)) {
ret = new Lowercase(); ret = new Lowercase();
} else if (key.equals("uppercase")) { } else if (key.equals(TRANSFORMER_NAME_UPPERCASE)) {
ret = new Uppercase(); ret = new Uppercase();
} else if (key.equals(TRANSFORMER_NAME_REMOVE_CLASSIFICATION)) {
String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length);
ret = new RemoveClassification(name);
} else if (key.equals(TRANSFORMER_NAME_ADD)) {
String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length);
ret = new AddValueToAttribute(name);
} else if (key.equals(TRANSFORMER_NAME_CLEAR_ATTR)) {
String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length);
ret = new ClearAttributes(name);
} else if (key.equals(TRANSFORMER_SET_DELETED)) {
ret = new SetDeleted();
} else { } else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec); throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec);
} }
...@@ -66,7 +90,7 @@ public abstract class ImportTransformer { ...@@ -66,7 +90,7 @@ public abstract class ImportTransformer {
private final String replaceStr; private final String replaceStr;
public Replace(String toFindStr, String replaceStr) { public Replace(String toFindStr, String replaceStr) {
super("replace"); super(TRANSFORMER_NAME_REPLACE);
this.toFindStr = toFindStr; this.toFindStr = toFindStr;
this.replaceStr = replaceStr; this.replaceStr = replaceStr;
...@@ -77,7 +101,7 @@ public abstract class ImportTransformer { ...@@ -77,7 +101,7 @@ public abstract class ImportTransformer {
public String getReplaceStr() { return replaceStr; } public String getReplaceStr() { return replaceStr; }
@Override @Override
public Object apply(Object o) throws AtlasBaseException { public Object apply(Object o) {
Object ret = o; Object ret = o;
if(o instanceof String) { if(o instanceof String) {
...@@ -90,7 +114,7 @@ public abstract class ImportTransformer { ...@@ -90,7 +114,7 @@ public abstract class ImportTransformer {
static class Lowercase extends ImportTransformer { static class Lowercase extends ImportTransformer {
public Lowercase() { public Lowercase() {
super("lowercase"); super(TRANSFORMER_NAME_LOWERCASE);
} }
@Override @Override
...@@ -107,7 +131,7 @@ public abstract class ImportTransformer { ...@@ -107,7 +131,7 @@ public abstract class ImportTransformer {
static class Uppercase extends ImportTransformer { static class Uppercase extends ImportTransformer {
public Uppercase() { public Uppercase() {
super("uppercase"); super(TRANSFORMER_NAME_UPPERCASE);
} }
@Override @Override
...@@ -121,4 +145,175 @@ public abstract class ImportTransformer { ...@@ -121,4 +145,175 @@ public abstract class ImportTransformer {
return ret; return ret;
} }
} }
static class RemoveClassification extends ImportTransformer {
private final String classificationToBeRemoved;
public RemoveClassification(String name) {
super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION);
this.classificationToBeRemoved = name;
}
@Override
public Object apply(Object o) {
if (!(o instanceof AtlasEntity)) {
return o;
}
AtlasEntity entity = (AtlasEntity) o;
if(entity.getClassifications().size() == 0) {
return o;
}
List<AtlasClassification> toRemove = null;
for (AtlasClassification classification : entity.getClassifications()) {
if (classification.getTypeName().equals(classificationToBeRemoved)) {
if (toRemove == null) {
toRemove = new ArrayList<AtlasClassification>();
}
toRemove.add(classification);
}
}
if (toRemove != null) {
entity.getClassifications().removeAll(toRemove);
}
return entity;
}
@Override
public String toString() {
return String.format("%s=%s", "RemoveClassification", classificationToBeRemoved);
}
}
static class AddValueToAttribute extends ImportTransformer {
private final String nameValuePair;
private String attrName;
private String attrValueRaw;
private Object attrValue;
protected AddValueToAttribute(String nameValuePair) {
super(TRANSFORMER_NAME_ADD);
this.nameValuePair = nameValuePair;
setAttrNameValue(this.nameValuePair);
}
private void setAttrNameValue(String nameValuePair) {
String SEPARATOR_EQUALS = "=";
if(!nameValuePair.contains(SEPARATOR_EQUALS)) return;
String splits[] = StringUtils.split(nameValuePair, SEPARATOR_EQUALS);
if(splits.length == 0) {
return;
}
if(splits.length >= 1) {
attrName = splits[0];
}
if(splits.length >= 1) {
attrValueRaw = splits[1];
}
setAttrValue(attrValueRaw);
}
private void setAttrValue(String attrValueRaw) {
final String type_prefix = "list:";
if(attrValueRaw.startsWith(type_prefix)) {
final String item = StringUtils.remove(attrValueRaw, type_prefix);
attrValue = new ArrayList<String>() {{
add(item);
}};
} else {
attrValue = attrValueRaw;
}
}
@Override
public Object apply(Object o) {
if(o == null) {
return o;
}
if(!(o instanceof AtlasEntity)) {
return o;
}
AtlasEntity entity = (AtlasEntity) o;
Object attrExistingValue = entity.getAttribute(attrName);
if(attrExistingValue == null) {
entity.setAttribute(attrName, attrValue);
} else if(attrExistingValue instanceof List) {
List list = (List) attrExistingValue;
if(attrValue instanceof List) {
list.addAll((List) attrValue);
} else {
list.add(attrValue);
}
} else {
entity.setAttribute(attrName, attrValueRaw);
}
return entity;
}
}
static class ClearAttributes extends ImportTransformer {
private String[] attrNames;
protected ClearAttributes(String attrNames) {
super(TRANSFORMER_NAME_CLEAR_ATTR);
this.attrNames = StringUtils.split(attrNames, ",");
}
@Override
public Object apply(Object o) {
if (o == null) {
return o;
}
if (!(o instanceof AtlasEntity)) {
return o;
}
AtlasEntity entity = (AtlasEntity) o;
for (String attrName : attrNames) {
entity.setAttribute(attrName, null);
}
return entity;
}
}
static class SetDeleted extends ImportTransformer {
protected SetDeleted() {
super(TRANSFORMER_SET_DELETED);
}
@Override
public Object apply(Object o) {
if (o == null) {
return o;
}
if (!(o instanceof AtlasEntity)) {
return o;
}
AtlasEntity entity = (AtlasEntity) o;
entity.setStatus(AtlasEntity.Status.DELETED);
return entity;
}
}
} }
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.atlas.repository.ogm; package org.apache.atlas.repository.ogm;
import org.apache.atlas.model.clusterinfo.AtlasCluster; import org.apache.atlas.model.impexp.AtlasCluster;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -63,6 +63,7 @@ public class AtlasClusterDTO extends AbstractDataTransferObject<AtlasCluster> { ...@@ -63,6 +63,7 @@ public class AtlasClusterDTO extends AbstractDataTransferObject<AtlasCluster> {
entity.setAttribute(PROPERTY_CLUSTER_NAME, obj.getName()); entity.setAttribute(PROPERTY_CLUSTER_NAME, obj.getName());
entity.setAttribute(PROPERTY_QUALIFIED_NAME, obj.getQualifiedName()); entity.setAttribute(PROPERTY_QUALIFIED_NAME, obj.getQualifiedName());
entity.setAttribute(PROPERTY_ADDITIONAL_INFO, obj.getAdditionalInfo()); entity.setAttribute(PROPERTY_ADDITIONAL_INFO, obj.getAdditionalInfo());
entity.setAttribute(PROPERTY_URLS, obj.getUrls());
return entity; return entity;
} }
......
...@@ -189,6 +189,26 @@ public class DataAccess { ...@@ -189,6 +189,26 @@ public class DataAccess {
} }
public <T extends AtlasBaseModelObject> T load(String guid, Class<? extends AtlasBaseModelObject> clazz) throws AtlasBaseException {
DataTransferObject<T> dto = (DataTransferObject<T>)dtoRegistry.get(clazz);
AtlasEntityWithExtInfo entityWithExtInfo = null;
if (StringUtils.isNotEmpty(guid)) {
entityWithExtInfo = entityStore.getById(guid);
}
if(entityWithExtInfo == null) {
return null;
}
return dto.from(entityWithExtInfo);
}
public void deleteUsingGuid(String guid) throws AtlasBaseException {
entityStore.deleteById(guid);
}
public void delete(String guid) throws AtlasBaseException { public void delete(String guid) throws AtlasBaseException {
Objects.requireNonNull(guid, "guid"); Objects.requireNonNull(guid, "guid");
AtlasPerfTracer perf = null; AtlasPerfTracer perf = null;
......
...@@ -19,14 +19,17 @@ ...@@ -19,14 +19,17 @@
package org.apache.atlas.repository.ogm; package org.apache.atlas.repository.ogm;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.impexp.ExportImportAuditEntry; import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashSet;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.Map; import java.util.Map;
import java.util.Set;
@Component @Component
public class ExportImportAuditEntryDTO extends AbstractDataTransferObject<ExportImportAuditEntry> { public class ExportImportAuditEntryDTO extends AbstractDataTransferObject<ExportImportAuditEntry> {
...@@ -40,30 +43,44 @@ public class ExportImportAuditEntryDTO extends AbstractDataTransferObject<Export ...@@ -40,30 +43,44 @@ public class ExportImportAuditEntryDTO extends AbstractDataTransferObject<Export
public static final String PROPERTY_SOURCE_CLUSTER_NAME = "sourceClusterName"; public static final String PROPERTY_SOURCE_CLUSTER_NAME = "sourceClusterName";
public static final String PROPERTY_TARGET_CLUSTER_NAME = "targetClusterName"; public static final String PROPERTY_TARGET_CLUSTER_NAME = "targetClusterName";
private static final Set<String> ATTRIBUTE_NAMES = new HashSet<>(Arrays.asList(PROPERTY_USER_NAME,
PROPERTY_OPERATION, PROPERTY_OPERATION_PARAMS,
PROPERTY_START_TIME, PROPERTY_END_TIME,
PROPERTY_RESULT_SUMMARY,
PROPERTY_SOURCE_CLUSTER_NAME, PROPERTY_TARGET_CLUSTER_NAME));
@Inject @Inject
public ExportImportAuditEntryDTO(AtlasTypeRegistry typeRegistry) { public ExportImportAuditEntryDTO(AtlasTypeRegistry typeRegistry) {
super(typeRegistry, ExportImportAuditEntry.class, super(typeRegistry, ExportImportAuditEntry.class,
Constants.INTERNAL_PROPERTY_KEY_PREFIX + ExportImportAuditEntry.class.getSimpleName()); Constants.INTERNAL_PROPERTY_KEY_PREFIX + ExportImportAuditEntry.class.getSimpleName());
} }
@Override public static Set<String> getAttributes() {
public ExportImportAuditEntry from(AtlasEntity entity) { return ATTRIBUTE_NAMES;
}
public static ExportImportAuditEntry from(String guid, Map<String,Object> attributes) {
ExportImportAuditEntry entry = new ExportImportAuditEntry(); ExportImportAuditEntry entry = new ExportImportAuditEntry();
setGuid(entry, entity); entry.setGuid(guid);
entry.setUserName((String) entity.getAttribute(PROPERTY_USER_NAME)); entry.setUserName((String) attributes.get(PROPERTY_USER_NAME));
entry.setOperation((String) entity.getAttribute(PROPERTY_OPERATION)); entry.setOperation((String) attributes.get(PROPERTY_OPERATION));
entry.setOperationParams((String) entity.getAttribute(PROPERTY_OPERATION_PARAMS)); entry.setOperationParams((String) attributes.get(PROPERTY_OPERATION_PARAMS));
entry.setStartTime((long) entity.getAttribute(PROPERTY_START_TIME)); entry.setStartTime((long) attributes.get(PROPERTY_START_TIME));
entry.setEndTime((long) entity.getAttribute(PROPERTY_END_TIME)); entry.setEndTime((long) attributes.get(PROPERTY_END_TIME));
entry.setSourceClusterName((String) entity.getAttribute(PROPERTY_SOURCE_CLUSTER_NAME)); entry.setSourceClusterName((String) attributes.get(PROPERTY_SOURCE_CLUSTER_NAME));
entry.setTargetClusterName((String) entity.getAttribute(PROPERTY_TARGET_CLUSTER_NAME)); entry.setTargetClusterName((String) attributes.get(PROPERTY_TARGET_CLUSTER_NAME));
entry.setResultSummary((String) entity.getAttribute(PROPERTY_RESULT_SUMMARY)); entry.setResultSummary((String) attributes.get(PROPERTY_RESULT_SUMMARY));
return entry; return entry;
} }
@Override @Override
public ExportImportAuditEntry from(AtlasEntity entity) {
return from(entity.getGuid(), entity.getAttributes());
}
@Override
public ExportImportAuditEntry from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { public ExportImportAuditEntry from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
return from(entityWithExtInfo.getEntity()); return from(entityWithExtInfo.getEntity());
} }
......
...@@ -1909,4 +1909,12 @@ public class EntityGraphMapper { ...@@ -1909,4 +1909,12 @@ public class EntityGraphMapper {
type.getNormalizedValueForUpdate(classification); type.getNormalizedValueForUpdate(classification);
} }
public static String getSoftRefFormattedValue(AtlasObjectId objectId) {
return getSoftRefFormattedString(objectId.getTypeName(), objectId.getGuid());
}
private static String getSoftRefFormattedString(String typeName, String resolvedGuid) {
return String.format(SOFT_REF_FORMAT, typeName, resolvedGuid);
}
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2; package org.apache.atlas.repository.store.graph.v2;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import jnr.ffi.annotations.In;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TimeBoundary; import org.apache.atlas.model.TimeBoundary;
...@@ -58,7 +59,9 @@ import org.apache.commons.collections.MapUtils; ...@@ -58,7 +59,9 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -92,7 +95,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation ...@@ -92,7 +95,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
@Component
public final class EntityGraphRetriever { public final class EntityGraphRetriever {
private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class); private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class);
...@@ -116,6 +119,7 @@ public final class EntityGraphRetriever { ...@@ -116,6 +119,7 @@ public final class EntityGraphRetriever {
private final boolean ignoreRelationshipAttr; private final boolean ignoreRelationshipAttr;
@Inject
public EntityGraphRetriever(AtlasTypeRegistry typeRegistry) { public EntityGraphRetriever(AtlasTypeRegistry typeRegistry) {
this(typeRegistry, false); this(typeRegistry, false);
} }
......
...@@ -20,11 +20,10 @@ package org.apache.atlas.repository.impexp; ...@@ -20,11 +20,10 @@ package org.apache.atlas.repository.impexp;
import org.apache.atlas.TestModules; import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster; import org.apache.atlas.model.impexp.AtlasCluster;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.impexp.ClusterService; import org.apache.atlas.repository.Constants;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
...@@ -35,18 +34,20 @@ import java.io.IOException; ...@@ -35,18 +34,20 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.apache.atlas.repository.Constants.ATTR_NAME_REFERENCEABLE;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class) @Guice(modules = TestModules.TestOnlyModule.class)
public class ClusterServiceTest { public class ClusterServiceTest {
private final String TOP_LEVEL_ENTITY_NAME = "db1@cl1"; private final String TOP_LEVEL_ENTITY_NAME = "db1@cl1";
private final String CLUSTER_NAME = "testCl1"; private final String CLUSTER_NAME = "testCl1";
private final String TARGET_CLUSTER_NAME = "testCl2"; private final String TARGET_CLUSTER_NAME = "testCl2";
private final String QUALIFIED_NAME_STOCKS = "stocks@cl1";
private final String TYPE_HIVE_DB = "hive_db";
private final String topLevelEntityGuid = "AAA-BBB-CCC";
@Inject @Inject
private AtlasTypeDefStore typeDefStore; private AtlasTypeDefStore typeDefStore;
...@@ -64,8 +65,8 @@ public class ClusterServiceTest { ...@@ -64,8 +65,8 @@ public class ClusterServiceTest {
@Test @Test
public void saveAndRetrieveClusterInfo() throws AtlasBaseException { public void saveAndRetrieveClusterInfo() throws AtlasBaseException {
AtlasCluster expected = getCluster(CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME); AtlasCluster expected = getCluster(CLUSTER_NAME + "_1", TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME);
AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME); AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME + "_1", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME);
AtlasCluster expected3 = getCluster(TARGET_CLUSTER_NAME + "_3", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0, TARGET_CLUSTER_NAME); AtlasCluster expected3 = getCluster(TARGET_CLUSTER_NAME + "_3", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0, TARGET_CLUSTER_NAME);
AtlasCluster actual = clusterService.save(expected); AtlasCluster actual = clusterService.save(expected);
...@@ -83,36 +84,38 @@ public class ClusterServiceTest { ...@@ -83,36 +84,38 @@ public class ClusterServiceTest {
assertEquals(actual.getName(), expected.getName()); assertEquals(actual.getName(), expected.getName());
assertEquals(actual.getQualifiedName(), expected.getQualifiedName()); assertEquals(actual.getQualifiedName(), expected.getQualifiedName());
assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION),
getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION));
assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP),
getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP));
} }
private AtlasCluster getCluster(String name, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetClusterName) { private AtlasCluster getCluster(String clusterName, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetClusterName) {
AtlasCluster cluster = new AtlasCluster(name, name); AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
Map<String, String> syncMap = new HashMap<>();
Map<String, Object> syncMap = new HashMap<>(); syncMap.put("topLevelEntity", topLevelEntity);
syncMap.put("operation", operation); syncMap.put("operation", operation);
syncMap.put("nextModifiedTimestamp", nextModifiedTimestamp); syncMap.put("nextModifiedTimestamp", Long.toString(nextModifiedTimestamp));
syncMap.put("targetCluster", targetClusterName); syncMap.put("targetCluster", targetClusterName);
String syncMapJson = AtlasType.toJson(syncMap); cluster.setAdditionalInfo(syncMap);
String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntity);
cluster.setAdditionalInfo(topLevelEntitySpecificKey, syncMapJson);
return cluster; return cluster;
} }
private Map<String, Object> getAdditionalInfo(AtlasCluster cluster, String topLevelEntityName) { @Test
String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntityName); public void verifyAdditionalInfo() throws AtlasBaseException {
assertTrue(cluster.getAdditionalInfo().containsKey(topLevelEntitySpecificKey)); final long expectedLastModifiedTimestamp = 200L;
AtlasCluster expectedCluster = new AtlasCluster(CLUSTER_NAME, CLUSTER_NAME);
String json = cluster.getAdditionalInfo(topLevelEntitySpecificKey); String qualifiedNameAttr = Constants.QUALIFIED_NAME.replace(ATTR_NAME_REFERENCEABLE, "");
return AtlasType.fromJson(json, Map.class); AtlasObjectId objectId = new AtlasObjectId(TYPE_HIVE_DB, qualifiedNameAttr, QUALIFIED_NAME_STOCKS);
} expectedCluster.setAdditionalInfoRepl(topLevelEntityGuid, expectedLastModifiedTimestamp);
AtlasCluster actualCluster = clusterService.save(expectedCluster);
assertEquals(actualCluster.getName(), expectedCluster.getName());
int actualModifiedTimestamp = (int) actualCluster.getAdditionalInfoRepl(topLevelEntityGuid);
private String getTopLevelEntitySpecificKey(String topLevelEntity) { assertEquals(actualModifiedTimestamp, expectedLastModifiedTimestamp);
return String.format("%s:%s", AtlasCluster.SYNC_INFO_KEY, topLevelEntity);
} }
} }
...@@ -20,7 +20,6 @@ package org.apache.atlas.repository.impexp; ...@@ -20,7 +20,6 @@ package org.apache.atlas.repository.impexp;
import org.apache.atlas.TestModules; import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.impexp.ExportImportAuditEntry; import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
...@@ -31,14 +30,16 @@ import org.testng.annotations.Test; ...@@ -31,14 +30,16 @@ import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class) @Guice(modules = TestModules.TestOnlyModule.class)
public class ExportImportAuditServiceTest { public class ExportImportAuditServiceTest extends ExportImportTestBase {
@Inject @Inject
AtlasTypeRegistry typeRegistry; AtlasTypeRegistry typeRegistry;
...@@ -69,7 +70,7 @@ public class ExportImportAuditServiceTest { ...@@ -69,7 +70,7 @@ public class ExportImportAuditServiceTest {
String target2 = "clx1"; String target2 = "clx1";
ExportImportAuditEntry entry2 = saveAndGet(source2, ExportImportAuditEntry.OPERATION_EXPORT, target2); ExportImportAuditEntry entry2 = saveAndGet(source2, ExportImportAuditEntry.OPERATION_EXPORT, target2);
Thread.sleep(1000); pauseForIndexCreation();
ExportImportAuditEntry actualEntry = retrieveEntry(entry); ExportImportAuditEntry actualEntry = retrieveEntry(entry);
ExportImportAuditEntry actualEntry2 = retrieveEntry(entry2); ExportImportAuditEntry actualEntry2 = retrieveEntry(entry2);
...@@ -80,7 +81,7 @@ public class ExportImportAuditServiceTest { ...@@ -80,7 +81,7 @@ public class ExportImportAuditServiceTest {
assertEquals(actualEntry.getOperation(), entry.getOperation()); assertEquals(actualEntry.getOperation(), entry.getOperation());
} }
@Test(enabled = false) @Test
public void numberOfSavedEntries_Retrieved() throws AtlasBaseException, InterruptedException { public void numberOfSavedEntries_Retrieved() throws AtlasBaseException, InterruptedException {
final String source1 = "cluster1"; final String source1 = "cluster1";
final String target1 = "cly"; final String target1 = "cly";
...@@ -90,19 +91,20 @@ public class ExportImportAuditServiceTest { ...@@ -90,19 +91,20 @@ public class ExportImportAuditServiceTest {
saveAndGet(source1, ExportImportAuditEntry.OPERATION_EXPORT, target1); saveAndGet(source1, ExportImportAuditEntry.OPERATION_EXPORT, target1);
} }
Thread.sleep(5000); pauseForIndexCreation();
AtlasSearchResult results = auditService.get(source1, ExportImportAuditEntry.OPERATION_EXPORT, "", "", "", "", 10, 0); List<ExportImportAuditEntry> results = auditService.get("",
assertEquals(results.getEntities().size(), MAX_ENTRIES); ExportImportAuditEntry.OPERATION_EXPORT,
"", "", "", 10, 0);
assertTrue(results.size() > 0);
} }
private ExportImportAuditEntry retrieveEntry(ExportImportAuditEntry entry) throws AtlasBaseException {
private ExportImportAuditEntry retrieveEntry(ExportImportAuditEntry entry) throws AtlasBaseException, InterruptedException { List<ExportImportAuditEntry> result = auditService.get(entry.getUserName(), entry.getOperation(),
Thread.sleep(5000); entry.getSourceClusterName(),
AtlasSearchResult result = auditService.get(entry.getUserName(), entry.getOperation(), entry.getSourceClusterName(), Long.toString(entry.getStartTime()), "", 10, 0);
entry.getTargetClusterName(), Long.toString(entry.getStartTime()), "", 10, 0);
assertNotNull(result); assertNotNull(result);
assertEquals(result.getEntities().size(), 1); assertEquals(result.size(), 1);
entry.setGuid(result.getEntities().get(0).getGuid()); entry.setGuid(result.get(0).getGuid());
return auditService.get(entry); return auditService.get(entry);
} }
......
...@@ -22,7 +22,7 @@ import org.apache.atlas.ApplicationProperties; ...@@ -22,7 +22,7 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
...@@ -33,6 +33,7 @@ import org.testng.SkipException; ...@@ -33,6 +33,7 @@ import org.testng.SkipException;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
...@@ -76,21 +77,28 @@ public class ExportImportTestBase { ...@@ -76,21 +77,28 @@ public class ExportImportTestBase {
} }
} }
protected void assertAuditEntry(ExportImportAuditService auditService) { protected void assertAuditEntry(ExportImportAuditService auditService) throws InterruptedException {
AtlasSearchResult result = null; pauseForIndexCreation();
List<ExportImportAuditEntry> result = null;
try { try {
Thread.sleep(5000); result = auditService.get("", "", "", "", "", 10, 0);
result = auditService.get("", "", "", "", "", "", 10, 0);
} catch (Exception e) { } catch (Exception e) {
throw new SkipException("auditService.get: failed!"); throw new SkipException("audit entries not retrieved.");
} }
assertNotNull(result); assertNotNull(result);
assertNotNull(result.getEntities()); assertTrue(result.size() > 0);
assertTrue(result.getEntities().size() > 0);
} }
private String getCurrentCluster() throws AtlasException { private String getCurrentCluster() throws AtlasException {
return ApplicationProperties.get().getString(AtlasConstants.CLUSTER_NAME_KEY, "default"); return ApplicationProperties.get().getString(AtlasConstants.CLUSTER_NAME_KEY, "default");
} }
protected void pauseForIndexCreation() {
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
throw new SkipException("pause interrupted.");
}
}
} }
...@@ -60,9 +60,6 @@ public class ExportIncrementalTest extends ExportImportTestBase { ...@@ -60,9 +60,6 @@ public class ExportIncrementalTest extends ExportImportTestBase {
ExportService exportService; ExportService exportService;
@Inject @Inject
ClusterService clusterService;
@Inject
private AtlasEntityStoreV2 entityStore; private AtlasEntityStoreV2 entityStore;
private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental"; private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
......
...@@ -108,7 +108,7 @@ public class ImportServiceTest extends ExportImportTestBase { ...@@ -108,7 +108,7 @@ public class ImportServiceTest extends ExportImportTestBase {
} }
@AfterTest @AfterTest
public void postTest() { public void postTest() throws InterruptedException {
assertAuditEntry(auditService); assertAuditEntry(auditService);
} }
......
...@@ -24,15 +24,19 @@ import org.apache.atlas.RequestContext; ...@@ -24,15 +24,19 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules; import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2; import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster; import org.apache.atlas.model.impexp.AtlasCluster;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier; import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils; import org.apache.atlas.utils.TestResourceFileUtils;
import org.testng.SkipException; import org.testng.SkipException;
...@@ -46,15 +50,12 @@ import java.io.IOException; ...@@ -46,15 +50,12 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO; import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadEntity;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class) @Guice(modules = TestModules.TestOnlyModule.class)
public class ReplicationEntityAttributeTest extends ExportImportTestBase { public class ReplicationEntityAttributeTest extends ExportImportTestBase {
...@@ -92,23 +93,13 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { ...@@ -92,23 +93,13 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
@BeforeClass @BeforeClass
public void setup() throws IOException, AtlasBaseException { public void setup() throws IOException, AtlasBaseException {
loadBaseModel(typeDefStore, typeRegistry); basicSetup(typeDefStore, typeRegistry);
loadHiveModel(typeDefStore, typeRegistry);
createEntities();
}
private void createEntities() {
entityStore = new AtlasEntityStoreV2(deleteHandler, typeRegistry, mockChangeNotifier, graphMapper); entityStore = new AtlasEntityStoreV2(deleteHandler, typeRegistry, mockChangeNotifier, graphMapper);
createEntities(entityStore, ENTITIES_SUB_DIR, new String[]{"db", "table-columns"});
createAtlasEntity(entityStore, loadEntity(ENTITIES_SUB_DIR,"db")); AtlasType refType = typeRegistry.getType("Referenceable");
createAtlasEntity(entityStore, loadEntity(ENTITIES_SUB_DIR, "table-columns")); AtlasEntityDef entityDef = (AtlasEntityDef) typeDefStore.getByName(refType.getTypeName());
assertNotNull(entityDef);
try {
AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID));
assertEquals(entities.getEntities().size(), 2);
} catch (AtlasBaseException e) {
throw new SkipException(String.format("getByIds: could not load '%s' & '%s'.", DB_GUID, TABLE_GUID));
}
} }
@BeforeMethod @BeforeMethod
...@@ -128,20 +119,21 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { ...@@ -128,20 +119,21 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
assertNotNull(zipSource.getCreationOrder()); assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount); assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount);
assertClusterInfo(REPLICATED_TO_CLUSTER_NAME); assertCluster(REPLICATED_TO_CLUSTER_NAME, null);
assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_TO_CLUSTER); assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_TO_CLUSTER);
} }
@Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute", enabled = false) @Test(dependsOnMethods = "exportWithReplicationToOption_AddsClusterObjectIdToReplicatedFromAttribute", enabled = false)
public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException { public void importWithReplicationFromOption_AddsClusterObjectIdToReplicatedFromAttribute() throws AtlasBaseException, IOException {
AtlasImportRequest request = getImportRequestWithReplicationOption(); AtlasImportRequest request = getImportRequestWithReplicationOption();
runImportWithParameters(importService, request, zipSource); AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource);
assertClusterInfo(REPLICATED_FROM_CLUSTER_NAME); assertCluster(REPLICATED_FROM_CLUSTER_NAME, importResult);
assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER); assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER);
} }
private void assertReplicationAttribute(String attrNameReplication) throws AtlasBaseException { private void assertReplicationAttribute(String attrNameReplication) throws AtlasBaseException {
pauseForIndexCreation();
AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID)); AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID));
for (AtlasEntity e : entities.getEntities()) { for (AtlasEntity e : entities.getEntities()) {
Object ex = e.getAttribute(attrNameReplication); Object ex = e.getAttribute(attrNameReplication);
...@@ -152,11 +144,25 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { ...@@ -152,11 +144,25 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
} }
} }
private void assertClusterInfo(String name) { private void assertCluster(String name, AtlasImportResult importResult) throws AtlasBaseException {
AtlasCluster actual = clusterService.get(new AtlasCluster(name, name)); AtlasCluster actual = clusterService.get(new AtlasCluster(name, name));
assertNotNull(actual); assertNotNull(actual);
assertEquals(actual.getName(), name); assertEquals(actual.getName(), name);
if(importResult != null) {
assertClusterAdditionalInfo(actual, importResult);
}
}
private void assertClusterAdditionalInfo(AtlasCluster cluster, AtlasImportResult importResult) throws AtlasBaseException {
AtlasExportRequest request = importResult.getExportResult().getRequest();
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(request.getItemsToExport().get(0).getTypeName());
AtlasEntity.AtlasEntityWithExtInfo entity = entityStore.getByUniqueAttributes(type, request.getItemsToExport().get(0).getUniqueAttributes());
long actualLastModifiedTimestamp = (long) cluster.getAdditionalInfoRepl(entity.getEntity().getGuid());
assertTrue(cluster.getAdditionalInfo().size() > 0);
assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getLastModifiedTimestamp());
} }
private AtlasExportRequest getUpdateMetaInfoUpdateRequest() { private AtlasExportRequest getUpdateMetaInfoUpdateRequest() {
......
...@@ -4,5 +4,8 @@ ...@@ -4,5 +4,8 @@
"cardinality": "SET", "cardinality": "SET",
"isIndexable": false, "isIndexable": false,
"isOptional": true, "isOptional": true,
"isUnique": false "isUnique": false,
"options": {
"isSoftReference": "true"
}
} }
...@@ -28,7 +28,6 @@ import org.apache.atlas.authorize.AtlasPrivilege; ...@@ -28,7 +28,6 @@ import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.discovery.SearchContext; import org.apache.atlas.discovery.SearchContext;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster;
import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasExportResult;
...@@ -450,7 +449,6 @@ public class AdminResource { ...@@ -450,7 +449,6 @@ public class AdminResource {
/** /**
* Fetch details of a cluster. * Fetch details of a cluster.
* @param clusterName name of target cluster with which it is paired * @param clusterName name of target cluster with which it is paired
* @param entityQualifiedName qualified name of top level entity
* @return AtlasCluster * @return AtlasCluster
* @throws AtlasBaseException * @throws AtlasBaseException
*/ */
...@@ -458,8 +456,7 @@ public class AdminResource { ...@@ -458,8 +456,7 @@ public class AdminResource {
@Path("/cluster/{clusterName}") @Path("/cluster/{clusterName}")
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasCluster getCluster(@PathParam("clusterName") String clusterName, public AtlasCluster getCluster(@PathParam("clusterName") String clusterName) throws AtlasBaseException {
@QueryParam("entity") String entityQualifiedName) throws AtlasBaseException {
AtlasPerfTracer perf = null; AtlasPerfTracer perf = null;
try { try {
...@@ -478,22 +475,21 @@ public class AdminResource { ...@@ -478,22 +475,21 @@ public class AdminResource {
@Path("/expimp/audit") @Path("/expimp/audit")
@Consumes(Servlets.JSON_MEDIA_TYPE) @Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasSearchResult getExportImportAudit(@QueryParam("sourceClusterName") String sourceCluster, public List<ExportImportAuditEntry> getExportImportAudit(@QueryParam("clusterName") String cluster,
@QueryParam("targetCluster") String targetCluster, @QueryParam("userName") String userName,
@QueryParam("userName") String userName, @QueryParam("operation") String operation,
@QueryParam("operation") String operation, @QueryParam("startTime") String startTime,
@QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime,
@QueryParam("endTime") String endTime, @QueryParam("limit") int limit,
@QueryParam("limit") int limit, @QueryParam("offset") int offset) throws AtlasBaseException {
@QueryParam("offset") int offset) throws AtlasBaseException {
AtlasPerfTracer perf = null; AtlasPerfTracer perf = null;
try { try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "getExportImportAudit(" + sourceCluster + ")"); perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "getExportImportAudit(" + cluster + ")");
} }
return exportImportAuditService.get(userName, operation, sourceCluster, targetCluster, startTime, endTime, limit, offset); return exportImportAuditService.get(userName, operation, cluster, startTime, endTime, limit, offset);
} finally { } finally {
AtlasPerfTracer.log(perf); AtlasPerfTracer.log(perf);
} }
......
...@@ -21,13 +21,15 @@ package org.apache.atlas.web.resources; ...@@ -21,13 +21,15 @@ package org.apache.atlas.web.resources;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.clusterinfo.AtlasCluster; import org.apache.atlas.model.impexp.AtlasCluster;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.repository.impexp.ZipSource; import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.utils.TestResourceFileUtils; import org.apache.atlas.utils.TestResourceFileUtils;
import org.apache.atlas.web.integration.BaseResourceIT; import org.apache.atlas.web.integration.BaseResourceIT;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
...@@ -43,6 +45,13 @@ import static org.testng.Assert.assertTrue; ...@@ -43,6 +45,13 @@ import static org.testng.Assert.assertTrue;
public class AdminExportImportTestIT extends BaseResourceIT { public class AdminExportImportTestIT extends BaseResourceIT {
private final String FILE_TO_IMPORT = "stocks-base.zip"; private final String FILE_TO_IMPORT = "stocks-base.zip";
private final String EXPORT_REQUEST_FILE = "export-incremental";
private final String SOURCE_CLUSTER_NAME = "cl1";
static final String IMPORT_TRANSFORM_CLEAR_ATTRS =
"{ \"Asset\": { \"*\":[ \"clearAttrValue:replicatedToCluster,replicatedFromCluster\" ] } }";
static final String IMPORT_TRANSFORM_SET_DELETED =
"{ \"Asset\": { \"*\":[ \"setDeleted\" ] } }";
@Test @Test
public void isActive() throws AtlasServiceException { public void isActive() throws AtlasServiceException {
...@@ -50,25 +59,34 @@ public class AdminExportImportTestIT extends BaseResourceIT { ...@@ -50,25 +59,34 @@ public class AdminExportImportTestIT extends BaseResourceIT {
} }
@Test(dependsOnMethods = "isActive") @Test(dependsOnMethods = "isActive")
public void importData() throws AtlasServiceException, IOException { public void importData() throws AtlasServiceException {
performImport(FILE_TO_IMPORT); performImport(FILE_TO_IMPORT);
assertReplicationData("cl1");
} }
@Test(dependsOnMethods = "importData") @Test(dependsOnMethods = "importData")
public void exportData() throws AtlasServiceException, IOException, AtlasBaseException { public void exportData() throws AtlasServiceException, IOException, AtlasBaseException {
final int EXPECTED_CREATION_ORDER_SIZE = 13; final int EXPECTED_CREATION_ORDER_SIZE = 10;
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(".", "export-incremental", AtlasExportRequest.class); AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(".", EXPORT_REQUEST_FILE, AtlasExportRequest.class);
byte[] exportedBytes = atlasClientV2.exportData(request); byte[] exportedBytes = atlasClientV2.exportData(request);
assertNotNull(exportedBytes); assertNotNull(exportedBytes);
ZipSource zs = new ZipSource(new ByteArrayInputStream(exportedBytes)); ZipSource zs = new ZipSource(new ByteArrayInputStream(exportedBytes));
assertNotNull(zs.getExportResult()); assertNotNull(zs.getExportResult());
assertEquals(zs.getCreationOrder().size(), EXPECTED_CREATION_ORDER_SIZE); assertTrue(zs.getCreationOrder().size() > EXPECTED_CREATION_ORDER_SIZE);
} }
private void performImport(String fileToImport) throws AtlasServiceException { private void performImport(String fileToImport) throws AtlasServiceException {
AtlasImportRequest request = new AtlasImportRequest(); AtlasImportRequest request = new AtlasImportRequest();
request.getOptions().put(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, SOURCE_CLUSTER_NAME);
request.getOptions().put(AtlasImportRequest.TRANSFORMS_KEY, IMPORT_TRANSFORM_CLEAR_ATTRS);
performImport(fileToImport, request);
}
private void performImport(String fileToImport, AtlasImportRequest request) throws AtlasServiceException {
byte[] fileBytes = new byte[0]; byte[] fileBytes = new byte[0];
try { try {
fileBytes = Files.readAllBytes(Paths.get(TestResourceFileUtils.getTestFilePath(fileToImport))); fileBytes = Files.readAllBytes(Paths.get(TestResourceFileUtils.getTestFilePath(fileToImport)));
...@@ -82,4 +100,23 @@ public class AdminExportImportTestIT extends BaseResourceIT { ...@@ -82,4 +100,23 @@ public class AdminExportImportTestIT extends BaseResourceIT {
assertNotNull(result.getMetrics()); assertNotNull(result.getMetrics());
assertEquals(result.getProcessedEntities().size(), 37); assertEquals(result.getProcessedEntities().size(), 37);
} }
private void assertReplicationData(String clusterName) throws AtlasServiceException {
AtlasCluster cluster = atlasClientV2.getCluster(clusterName);
assertNotNull(cluster);
assertNotNull(cluster.getAdditionalInfo());
assertTrue(cluster.getAdditionalInfo().size() > 0);
}
@AfterClass
protected void teardown() {
AtlasImportRequest request = new AtlasImportRequest();
request.getOptions().put(AtlasImportRequest.TRANSFORMS_KEY, IMPORT_TRANSFORM_SET_DELETED);
try {
performImport(FILE_TO_IMPORT, request);
} catch (AtlasServiceException e) {
throw new SkipException("performTeardown: failed! Subsequent tests results may be affected.", e);
}
}
} }
...@@ -2,10 +2,10 @@ ...@@ -2,10 +2,10 @@
"itemsToExport": [ "itemsToExport": [
{ {
"typeName": "hive_db", "uniqueAttributes": { "qualifiedName": "stocks@cl1" } "typeName": "hive_db", "uniqueAttributes": { "qualifiedName": "stocks@cl1" }
} }
], ],
"options": { "options": {
"fetchType": "full" "fetchType": "incremental",
"replicatedTo": "cl2"
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment