Commit 921f487b by Ashutosh Mestry Committed by nixonrodrigues

ATLAS-3362: Updated logic for storing repl key for table-level replication.

parent b8af8c90
......@@ -18,6 +18,7 @@
package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
......@@ -28,9 +29,13 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.MapUtils;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,8 +43,8 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@Component
public class AuditsWriter {
......@@ -47,6 +52,8 @@ public class AuditsWriter {
private static final String CLUSTER_NAME_DEFAULT = "default";
private static final String DC_SERVER_NAME_SEPARATOR = "$";
private AtlasTypeRegistry typeRegistry;
private AtlasEntityStore entityStore;
private AtlasServerService atlasServerService;
private ExportImportAuditService auditService;
......@@ -54,7 +61,9 @@ public class AuditsWriter {
private ImportAudits auditForImport = new ImportAudits();
@Inject
public AuditsWriter(AtlasServerService atlasServerService, ExportImportAuditService auditService) {
public AuditsWriter(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, AtlasServerService atlasServerService, ExportImportAuditService auditService) {
this.typeRegistry = typeRegistry;
this.entityStore = entityStore;
this.atlasServerService = atlasServerService;
this.auditService = auditService;
}
......@@ -80,7 +89,9 @@ public class AuditsWriter {
return;
}
AtlasServer server = saveServer(serverName, serverFullName, exportedGuids.get(0), lastModifiedTimestamp);
String candidateGuid = exportedGuids.get(0);
String replGuidKey = ReplKeyGuidFinder.get(typeRegistry, entityStore, candidateGuid);
AtlasServer server = saveServer(serverName, serverFullName, replGuidKey, lastModifiedTimestamp);
atlasServerService.updateEntitiesWithServer(server, exportedGuids, attrNameReplicated);
}
......@@ -126,6 +137,50 @@ public class AuditsWriter {
atlasServerService.getCreateAtlasServer(getCurrentClusterName(), getCurrentClusterName());
}
static class ReplKeyGuidFinder {
private static final String ENTITY_TYPE_HIVE_DB = "hive_db";
private static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
private static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
public static String get(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String candidateGuid) {
String guid = null;
try {
guid = getParentEntityGuid(typeRegistry, entityStore, candidateGuid);
} catch (AtlasBaseException e) {
LOG.error("Error fetching parent guid for child entity: {}", candidateGuid);
}
if (StringUtils.isEmpty(guid)) {
guid = candidateGuid;
}
return guid;
}
private static String getParentEntityGuid(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String defaultGuid) throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo extInfo = entityStore.getById(defaultGuid);
if (extInfo == null || extInfo.getEntity() == null) {
return null;
}
String typeName = extInfo.getEntity().getTypeName();
if (!typeName.equals(ENTITY_TYPE_HIVE_TABLE) && !typeName.equals(ENTITY_TYPE_HIVE_COLUMN)) {
return null;
}
String hiveDBQualifiedName = extractHiveDBQualifiedName((String) extInfo.getEntity().getAttribute(EntityGraphRetriever.QUALIFIED_NAME));
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(ENTITY_TYPE_HIVE_DB);
return entityStore.getGuidByUniqueAttributes(entityType, Collections.singletonMap(EntityGraphRetriever.QUALIFIED_NAME, hiveDBQualifiedName));
}
@VisibleForTesting
static String extractHiveDBQualifiedName(String qualifiedName) {
return String.format("%s@%s",
StringUtils.substringBefore(qualifiedName, "."),
StringUtils.substringAfter(qualifiedName, "@"));
}
}
private class ExportAudits {
private AtlasExportRequest request;
private String targetServerName;
......@@ -159,13 +214,11 @@ public class AuditsWriter {
private AtlasImportRequest request;
private boolean replicationOptionState;
private String sourceServerName;
private String optionKeyReplicatedFrom;
private String sourceServerFullName;
public void add(String userName, AtlasImportResult result,
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
optionKeyReplicatedFrom = AtlasImportRequest.OPTION_KEY_REPLICATED_FROM;
request = result.getRequest();
replicationOptionState = request.isReplicationOptionSet();
......
......@@ -33,6 +33,7 @@ import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
......@@ -146,6 +147,14 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM);
}
@Test
public void replKeyGuidFinder() {
String expectedDBQualifiedName = "largedb@cl1";
assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("largedb.testtable_0.col101@cl1"), expectedDBQualifiedName);
assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("largedb.testtable_0@cl1"), expectedDBQualifiedName);
}
private void assertReplicationAttribute(String attrNameReplication) throws AtlasBaseException {
pauseForIndexCreation();
AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment