Commit 8903c9a6 by Ashutosh Mestry

ATLAS-2862: Incremental Export now uses request context to determine change marker.

parent 708e4865
......@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.commons.collections.MapUtils;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
......@@ -53,7 +54,7 @@ public class AtlasExportRequest implements Serializable {
public static final String FETCH_TYPE_FULL = "full";
public static final String FETCH_TYPE_CONNECTED = "connected";
public static final String FETCH_TYPE_INCREMENTAL = "incremental";
public static final String FETCH_TYPE_INCREMENTAL_FROM_TIME = "fromTime";
public static final String FETCH_TYPE_INCREMENTAL_CHANGE_MARKER = "changeMarker";
public static final String MATCH_TYPE_STARTS_WITH = "startsWith";
public static final String MATCH_TYPE_ENDS_WITH = "endsWith";
public static final String MATCH_TYPE_CONTAINS = "contains";
......@@ -79,6 +80,48 @@ public class AtlasExportRequest implements Serializable {
this.options = options;
}
public String getMatchTypeOptionValue() {
String matchType = null;
if (MapUtils.isNotEmpty(getOptions())) {
if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
}
}
return matchType;
}
public String getFetchTypeOptionValue() {
if(getOptions() == null || !getOptions().containsKey(OPTION_FETCH_TYPE)) {
return FETCH_TYPE_FULL;
}
Object o = getOptions().get(OPTION_FETCH_TYPE);
if (o instanceof String) {
return (String) o;
}
return FETCH_TYPE_FULL;
}
public boolean getSkipLineageOptionValue() {
if(!getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE)) {
return false;
}
Object o = getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE);
if(o instanceof String) {
return Boolean.parseBoolean((String) o);
}
if(o instanceof Boolean) {
return (Boolean) o;
}
return false;
}
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
......
......@@ -62,14 +62,15 @@ public class AtlasExportResult implements Serializable {
private AtlasExportData data;
private OperationStatus operationStatus;
private String sourceClusterName;
private long lastModifiedTimestamp;
private long changeMarker;
public AtlasExportResult() {
this(null, null, null, null, System.currentTimeMillis());
this(null, null, null, null, System.currentTimeMillis(), 0L);
}
public AtlasExportResult(AtlasExportRequest request,
String userName, String clientIpAddress, String hostName, long timeStamp) {
String userName, String clientIpAddress, String hostName, long timeStamp,
long changeMarker) {
this.request = request;
this.userName = userName;
this.clientIpAddress = clientIpAddress;
......@@ -78,6 +79,7 @@ public class AtlasExportResult implements Serializable {
this.metrics = new HashMap<>();
this.operationStatus = OperationStatus.FAIL;
this.data = new AtlasExportData();
this.changeMarker = changeMarker;
}
public AtlasExportRequest getRequest() {
......@@ -136,12 +138,12 @@ public class AtlasExportResult implements Serializable {
this.data = data;
}
public void setLastModifiedTimestamp(long lastModifiedTimestamp) {
this.lastModifiedTimestamp = lastModifiedTimestamp;
public void setChangeMarker(long changeMarker) {
this.changeMarker = changeMarker;
}
public long getLastModifiedTimestamp() {
return this.lastModifiedTimestamp;
public long getChangeMarker() {
return this.changeMarker;
}
public OperationStatus getOperationStatus() {
......@@ -173,22 +175,6 @@ public class AtlasExportResult implements Serializable {
metrics.put(key, currentValue + incrementBy);
}
public AtlasExportResult shallowCopy() {
AtlasExportResult result = new AtlasExportResult();
result.setRequest(getRequest());
result.setUserName(getUserName());
result.setClientIpAddress(getClientIpAddress());
result.setHostName(getHostName());
result.setTimeStamp(getTimeStamp());
result.setMetrics(getMetrics());
result.setOperationStatus(getOperationStatus());
result.setSourceClusterName(getSourceClusterName());
result.setLastModifiedTimestamp(getLastModifiedTimestamp());
return result;
}
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
......@@ -199,14 +185,13 @@ public class AtlasExportResult implements Serializable {
sb.append(", userName='").append(userName).append("'");
sb.append(", clientIpAddress='").append(clientIpAddress).append("'");
sb.append(", hostName='").append(hostName).append("'");
sb.append(", lastModifiedTimestamp='").append(lastModifiedTimestamp).append("'");
sb.append(", changeMarker='").append(changeMarker).append("'");
sb.append(", sourceCluster='").append(sourceClusterName).append("'");
sb.append(", timeStamp='").append(timeStamp).append("'");
sb.append(", metrics={");
AtlasBaseTypeDef.dumpObjects(metrics, sb);
sb.append("}");
sb.append(", data='").append(data).append("'");
sb.append(", operationStatus='").append(operationStatus).append("'");
sb.append("}");
......@@ -237,13 +222,11 @@ public class AtlasExportResult implements Serializable {
private static final long serialVersionUID = 1L;
private AtlasTypesDef typesDef;
private Map<String, AtlasEntity> entities;
private List<String> entityCreationOrder;
public AtlasExportData() {
typesDef = new AtlasTypesDef();
entities = new HashMap<>();
entityCreationOrder = new ArrayList<>();
}
......@@ -251,10 +234,6 @@ public class AtlasExportResult implements Serializable {
public void setTypesDef(AtlasTypesDef typesDef) { this.typesDef = typesDef; }
public Map<String, AtlasEntity> getEntities() { return entities; }
public void setEntities(Map<String, AtlasEntity> entities) { this.entities = entities; }
public List<String> getEntityCreationOrder() { return entityCreationOrder; }
public void setEntityCreationOrder(List<String> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; }
......@@ -267,7 +246,6 @@ public class AtlasExportResult implements Serializable {
sb.append("AtlasExportData {");
sb.append(", typesDef={").append(typesDef).append("}");
sb.append(", entities={");
AtlasBaseTypeDef.dumpObjects(entities, sb);
sb.append("}");
sb.append(", entityCreationOrder={");
AtlasBaseTypeDef.dumpObjects(entityCreationOrder, sb);
......
......@@ -149,7 +149,7 @@ public class AtlasImportResult {
}
public void setExportResult(AtlasExportResult exportResult) {
this.exportResultWithoutData = exportResult.shallowCopy();
this.exportResultWithoutData = exportResult;
}
public StringBuilder toString(StringBuilder sb) {
......
......@@ -144,7 +144,7 @@ public class AuditsWriter {
}
updateReplicationAttribute(replicationOptionState, targetServerName,
entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getLastModifiedTimestamp());
entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker());
}
private void saveServers() throws AtlasBaseException {
......@@ -182,7 +182,7 @@ public class AuditsWriter {
}
updateReplicationAttribute(replicationOptionState, this.sourceServerName, entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getLastModifiedTimestamp());
Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
}
private void saveServers() throws AtlasBaseException {
......
......@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.impexp.AtlasExportRequest;
......@@ -93,7 +94,9 @@ public class ExportService {
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
String requestingIP) throws AtlasBaseException {
long startTime = System.currentTimeMillis();
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime);
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
hostName, startTime, getCurrentChangeMarker());
ExportContext context = new ExportContext(atlasGraph, result, exportSink);
exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
......@@ -117,6 +120,10 @@ public class ExportService {
return context.result;
}
private long getCurrentChangeMarker() {
return RequestContext.earliestActiveRequestTime();
}
private void updateSinkWithOperationMetrics(String userName, ExportContext context,
AtlasExportResult.OperationStatus[] statuses,
long startTime, long endTime) throws AtlasBaseException {
......@@ -125,7 +132,6 @@ public class ExportService {
context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef());
context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp);
context.result.setOperationStatus(getOverallOperationStatus(statuses));
context.result.incrementMeticsCounter("duration", duration);
auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder());
......@@ -362,7 +368,7 @@ public class ExportService {
debugLog("<== processEntity({})", guid);
}
private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
switch (context.fetchType) {
case CONNECTED:
getEntityGuidsForConnectedFetch(entity, context, direction);
......@@ -375,7 +381,7 @@ public class ExportService {
}
}
private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
if (direction == null || direction == TraversalDirection.UNKNOWN) {
getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
} else {
......@@ -688,8 +694,7 @@ public class ExportService {
private final ExportFetchType fetchType;
private final String matchType;
private final boolean skipLineage;
private final long lastModifiedTimestampRequested;
private long newestLastModifiedTimestamp;
private final long changeMarker;
private int progressReportCount = 0;
......@@ -699,45 +704,16 @@ public class ExportService {
scriptEngine = atlasGraph.getGremlinScriptEngine();
bindings = new HashMap<>();
fetchType = getFetchType(result.getRequest());
matchType = getMatchType(result.getRequest());
skipLineage = getOptionSkipLineage(result.getRequest());
this.lastModifiedTimestampRequested = getLastModifiedTimestamp(fetchType, result.getRequest());
this.newestLastModifiedTimestamp = 0;
}
private ExportFetchType getFetchType(AtlasExportRequest request) {
Object fetchOption = request.getOptions() != null ? request.getOptions().get(OPTION_FETCH_TYPE) : null;
if (fetchOption instanceof String) {
return ExportFetchType.from((String) fetchOption);
} else if (fetchOption instanceof ExportFetchType) {
return (ExportFetchType) fetchOption;
fetchType = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
matchType = result.getRequest().getMatchTypeOptionValue();
skipLineage = result.getRequest().getSkipLineageOptionValue();
this.changeMarker = getChangeTokenFromOptions(fetchType, result.getRequest());
}
return ExportFetchType.FULL;
}
private String getMatchType(AtlasExportRequest request) {
String matchType = null;
if (MapUtils.isNotEmpty(request.getOptions())) {
if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
}
}
return matchType;
}
private boolean getOptionSkipLineage(AtlasExportRequest request) {
return request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE) &&
(boolean) request.getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE);
}
private long getLastModifiedTimestamp(ExportFetchType fetchType, AtlasExportRequest request) {
if(fetchType == ExportFetchType.INCREMENTAL && request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME)) {
return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME).toString());
private long getChangeTokenFromOptions(ExportFetchType fetchType, AtlasExportRequest request) {
if(fetchType == ExportFetchType.INCREMENTAL &&
request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER)) {
return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString());
}
return 0L;
......@@ -794,19 +770,7 @@ public class ExportService {
return true;
}
long entityModificationTimestamp = entity.getUpdateTime().getTime();
updateNewestLastModifiedTimestamp(entityModificationTimestamp);
return doesTimestampQualify(entityModificationTimestamp);
}
private void updateNewestLastModifiedTimestamp(long entityModificationTimestamp) {
if(newestLastModifiedTimestamp < entityModificationTimestamp) {
newestLastModifiedTimestamp = entityModificationTimestamp;
}
}
private boolean doesTimestampQualify(long modificationTimestamp) {
return lastModifiedTimestampRequested < modificationTimestamp;
return changeMarker <= entity.getUpdateTime().getTime();
}
public boolean getSkipLineage() {
......@@ -814,7 +778,6 @@ public class ExportService {
}
public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
updateNewestLastModifiedTimestamp(entityWithExtInfo.getEntity().getUpdateTime().getTime());
sink.add(entityWithExtInfo);
}
}
......
......@@ -36,10 +36,11 @@ import java.util.zip.ZipOutputStream;
public class ZipSink {
private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
private static String FILE_EXTENSION_JSON = ".json";
private ZipOutputStream zipOutputStream;
final Set<String> guids = new HashSet<>();
public ZipSink(OutputStream outputStream) {
zipOutputStream = new ZipOutputStream(outputStream);
}
......@@ -92,7 +93,7 @@ public class ZipSink {
private void saveToZip(String fileName, String jsonData) throws AtlasBaseException {
try {
addToZipStream(fileName.toString() + ".json", jsonData);
addToZipStream(fileName.toString() + FILE_EXTENSION_JSON, jsonData);
} catch (IOException e) {
throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e);
}
......
......@@ -41,7 +41,7 @@ import javax.inject.Inject;
import java.io.IOException;
import java.util.Map;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
......@@ -98,7 +98,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
private long updateTimesampForNextIncrementalExport(ZipSource source) throws AtlasBaseException {
return source.getExportResult().getLastModifiedTimestamp();
return source.getExportResult().getChangeMarker();
}
@Test(dependsOnMethods = "atT0_ReturnsAllEntities")
......@@ -161,7 +161,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
private AtlasExportRequest getIncrementalRequest(long timestamp) {
try {
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
request.getOptions().put(FETCH_TYPE_INCREMENTAL_FROM_TIME, timestamp);
request.getOptions().put(FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, timestamp);
return request;
} catch (IOException e) {
......
......@@ -72,6 +72,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase {
assertNotNull(classification);
assertEntities(result.getProcessedEntities(), TAG_NAME);
}
private void assertEntities(List<String> entityGuids, String tagName) throws AtlasBaseException {
for (String guid : entityGuids) {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = this.entityStore.getById(guid);
......
......@@ -159,7 +159,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
long actualLastModifiedTimestamp = (long) cluster.getAdditionalInfoRepl(entity.getEntity().getGuid());
assertTrue(cluster.getAdditionalInfo().size() > 0);
assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getLastModifiedTimestamp());
assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getChangeMarker());
}
private AtlasExportRequest getUpdateMetaInfoUpdateRequest() {
......
......@@ -59,7 +59,7 @@ public class ZipSinkTest {
itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", "default"));
request.setItemsToExport(itemsToExport);
defaultExportResult = new AtlasExportResult(request, "admin", "1.0.0.0", "root", 100);
defaultExportResult = new AtlasExportResult(request, "admin", "1.0.0.0", "root", 100, 0L);
return defaultExportResult;
}
......
......@@ -6,6 +6,6 @@
],
"options": {
"fetchType": "incremental",
"fromTime": 0
"changeMarker": 0
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment