Commit 8639ada6 by Ashutosh Mestry

ATLAS-2886: Support for fully qualified server name

parent 31c3bea1
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [
{
"name": "TaxonomyTerm",
"superTypes": [],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "atlas.taxonomy",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
],
"entityDefs": [
{
"name": "Referenceable",
"superTypes": [],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "qualifiedName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
}
]
},
{
"name": "__internal",
"superTypes": [],
"typeVersion": "1.0",
"attributeDefs": []
},
{
"name": "Asset",
"superTypes": [],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "description",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "owner",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "DataSet",
"superTypes": [
"Referenceable",
"Asset"
],
"typeVersion": "1.0",
"attributeDefs": []
},
{
"name": "Infrastructure",
"superTypes": [
"Referenceable",
"Asset"
],
"typeVersion": "1.0",
"attributeDefs": []
},
{
"name": "Process",
"superTypes": [
"Referenceable",
"Asset"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "inputs",
"typeName": "array<DataSet>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "outputs",
"typeName": "array<DataSet>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "AtlasServer",
"typeVersion": "1.0",
"superTypes": [
],
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "displayName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "fullName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
},
{
"name": "urls",
"typeName": "array<string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "additionalInfo",
"typeName": "map<string,string>",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "__AtlasUserProfile",
"superTypes": [
"__internal"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
},
{
"name": "fullName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "savedSearches",
"typeName": "array<__AtlasUserSavedSearch>",
"cardinality": "LIST",
"isIndexable": false,
"isOptional": true,
"isUnique": false,
"constraints": [
{
"type": "ownedRef"
}
]
}
]
},
{
"name": "__AtlasUserSavedSearch",
"superTypes": [
"__internal"
],
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "name",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "ownerName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "uniqueName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": true
},
{
"name": "searchType",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "searchParameters",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "searchParameters",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
},
{
"name": "__ExportImportAuditEntry",
"typeVersion": "1.0",
"superTypes": [
"__internal"
],
"attributeDefs": [
{
"name": "userName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
},
{
"name": "operation",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "sourceServerName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "targetServerName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "operationParams",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "operationStartTime",
"typeName": "long",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "operationEndTime",
"typeName": "long",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": true,
"isUnique": false
},
{
"name": "resultSummary",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": true,
"isUnique": false
}
]
}
]
}
......@@ -17,7 +17,6 @@
*/
package org.apache.atlas.entitytransform;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -226,7 +225,6 @@ public abstract class Condition {
}
}
@VisibleForTesting
void addObjectId(AtlasObjectId objId) {
this.objectIds.add(objId);
}
......
......@@ -44,6 +44,7 @@ import java.util.Map;
public class AuditsWriter {
private static final Logger LOG = LoggerFactory.getLogger(AuditsWriter.class);
private static final String CLUSTER_NAME_DEFAULT = "default";
private static final String DC_SERVER_NAME_SEPARATOR = "$";
private AtlasServerService atlasServerService;
private ExportImportAuditService auditService;
......@@ -74,7 +75,7 @@ public class AuditsWriter {
}
private void updateReplicationAttribute(boolean isReplicationSet,
String serverName,
String serverName, String serverFullName,
List<String> exportedGuids,
String attrNameReplicated,
long lastModifiedTimestamp) throws AtlasBaseException {
......@@ -82,7 +83,7 @@ public class AuditsWriter {
return;
}
AtlasServer server = saveServer(serverName, exportedGuids.get(0), lastModifiedTimestamp);
AtlasServer server = saveServer(serverName, serverFullName, exportedGuids.get(0), lastModifiedTimestamp);
atlasServerService.updateEntitiesWithServer(server, exportedGuids, attrNameReplicated);
}
......@@ -92,15 +93,16 @@ public class AuditsWriter {
: StringUtils.EMPTY;
}
private AtlasServer saveServer(String name) throws AtlasBaseException {
return atlasServerService.save(new AtlasServer(name, name));
private AtlasServer saveServer(String name, String serverFullName) {
AtlasServer cluster = new AtlasServer(name, serverFullName);
return atlasServerService.save(cluster);
}
private AtlasServer saveServer(String name,
private AtlasServer saveServer(String name, String serverFullName,
String entityGuid,
long lastModifiedTimestamp) throws AtlasBaseException {
long lastModifiedTimestamp) {
AtlasServer server = new AtlasServer(name, name);
AtlasServer server = new AtlasServer(name, serverFullName);
server.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp);
if (LOG.isDebugEnabled()) {
......@@ -120,11 +122,20 @@ public class AuditsWriter {
return StringUtils.EMPTY;
}
static String getServerNameFromFullName(String fullName) {
if (StringUtils.isEmpty(fullName) || !fullName.contains(DC_SERVER_NAME_SEPARATOR)) {
return fullName;
}
return StringUtils.split(fullName, "$")[1];
}
private class ExportAudits {
private AtlasExportRequest request;
private String targetServerName;
private String optionKeyReplicatedTo;
private boolean replicationOptionState;
private String targetServerFullName;
public void add(String userName, AtlasExportResult result,
long startTime, long endTime,
......@@ -143,16 +154,17 @@ public class AuditsWriter {
return;
}
updateReplicationAttribute(replicationOptionState, targetServerName,
updateReplicationAttribute(replicationOptionState, targetServerName, targetServerFullName,
entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker());
}
private void saveServers() throws AtlasBaseException {
saveServer(getCurrentClusterName());
saveServer(getCurrentClusterName(), getCurrentClusterName());
targetServerName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo);
targetServerFullName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo);
targetServerName = getServerNameFromFullName(targetServerFullName);
if(StringUtils.isNotEmpty(targetServerName)) {
saveServer(targetServerName);
saveServer(targetServerName, targetServerFullName);
}
}
}
......@@ -162,6 +174,7 @@ public class AuditsWriter {
private boolean replicationOptionState;
private String sourceServerName;
private String optionKeyReplicatedFrom;
private String sourceServerFullName;
public void add(String userName, AtlasImportResult result,
long startTime, long endTime,
......@@ -181,16 +194,17 @@ public class AuditsWriter {
return;
}
updateReplicationAttribute(replicationOptionState, this.sourceServerName, entityGuids,
updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
}
private void saveServers() throws AtlasBaseException {
saveServer(getCurrentClusterName());
saveServer(getCurrentClusterName(), getCurrentClusterName());
sourceServerName = getClusterNameFromOptionsState();
sourceServerFullName = getClusterNameFromOptionsState();
sourceServerName = getServerNameFromFullName(sourceServerFullName);
if(StringUtils.isNotEmpty(sourceServerName)) {
saveServer(sourceServerName);
saveServer(sourceServerName, sourceServerFullName);
}
}
......
......@@ -61,7 +61,7 @@ public class ExportImportAuditServiceTest extends ExportImportTestBase {
}
@Test
public void saveLogEntry() throws AtlasBaseException, InterruptedException {
public void saveLogEntry() throws AtlasBaseException {
final String source1 = "clx";
final String target1 = "cly";
ExportImportAuditEntry entry = saveAndGet(source1, ExportImportAuditEntry.OPERATION_EXPORT, target1);
......
......@@ -23,7 +23,6 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
......@@ -64,7 +63,6 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
verifyCreatedEntities(entityStore, entityGuids, 2);
gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
EntityGraphRetriever entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
}
......
......@@ -116,7 +116,10 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
assertNotNull(zipSource.getCreationOrder());
assertEquals(zipSource.getCreationOrder().size(), expectedEntityCount);
assertCluster(REPLICATED_TO_CLUSTER_NAME, null);
assertCluster(
AuditsWriter.getServerNameFromFullName(REPLICATED_TO_CLUSTER_NAME),
REPLICATED_TO_CLUSTER_NAME, null);
assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_TO);
}
......@@ -125,7 +128,9 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
AtlasImportRequest request = getImportRequestWithReplicationOption();
AtlasImportResult importResult = runImportWithParameters(importService, request, zipSource);
assertCluster(REPLICATED_FROM_CLUSTER_NAME, importResult);
assertCluster(
AuditsWriter.getServerNameFromFullName(REPLICATED_FROM_CLUSTER_NAME),
REPLICATED_FROM_CLUSTER_NAME, importResult);
assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM);
}
......@@ -141,11 +146,12 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
}
}
private void assertCluster(String name, AtlasImportResult importResult) throws AtlasBaseException {
AtlasServer actual = atlasServerService.get(new AtlasServer(name, name));
private void assertCluster(String name, String fullName, AtlasImportResult importResult) throws AtlasBaseException {
AtlasServer actual = atlasServerService.get(new AtlasServer(name, fullName));
assertNotNull(actual);
assertEquals(actual.getName(), name);
assertEquals(actual.getFullName(), fullName);
if(importResult != null) {
assertClusterAdditionalInfo(actual, importResult);
......
......@@ -6,6 +6,6 @@
],
"options": {
"fetchType": "full",
"replicatedTo": "clTarget"
"replicatedTo": "dc2$clTarget"
}
}
{
"options": {
"replicatedFrom": "clSource"
"replicatedFrom": "dc1$clSource"
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment