From 921f487b043f801e70427e98e2d4ae9a8ff877dd Mon Sep 17 00:00:00 2001
From: Ashutosh Mestry <amestry@hortonworks.com>
Date: Fri, 9 Aug 2019 11:53:51 -0700
Subject: [PATCH] ATLAS-3362: Updated logic for storing repl key for table-level replication.

Signed-off-by: nixonrodrigues <nixon@apache.org>
---
 repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java                   | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
 repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java |  9 +++++++++
 2 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 9bf30f1..f2d36ed 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.repository.impexp;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.AtlasException;
@@ -28,9 +29,13 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasType;
-import org.apache.commons.collections.MapUtils;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,8 +43,8 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
 import javax.inject.Inject;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 @Component
 public class AuditsWriter {
@@ -47,6 +52,8 @@ public class AuditsWriter {
     private static final String CLUSTER_NAME_DEFAULT = "default";
     private static final String DC_SERVER_NAME_SEPARATOR = "$";
 
+    private AtlasTypeRegistry typeRegistry;
+    private AtlasEntityStore entityStore;
     private AtlasServerService atlasServerService;
     private ExportImportAuditService auditService;
 
@@ -54,7 +61,9 @@ public class AuditsWriter {
     private ImportAudits auditForImport = new ImportAudits();
 
     @Inject
-    public AuditsWriter(AtlasServerService atlasServerService, ExportImportAuditService auditService) {
+    public AuditsWriter(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, AtlasServerService atlasServerService, ExportImportAuditService auditService) {
+        this.typeRegistry = typeRegistry;
+        this.entityStore = entityStore;
         this.atlasServerService = atlasServerService;
         this.auditService = auditService;
     }
@@ -80,7 +89,9 @@ public class AuditsWriter {
             return;
         }
 
-        AtlasServer server = saveServer(serverName, serverFullName, exportedGuids.get(0), lastModifiedTimestamp);
+        String candidateGuid = exportedGuids.get(0);
+        String replGuidKey = ReplKeyGuidFinder.get(typeRegistry, entityStore, candidateGuid);
+        AtlasServer server = saveServer(serverName, serverFullName, replGuidKey, lastModifiedTimestamp);
         atlasServerService.updateEntitiesWithServer(server, exportedGuids, attrNameReplicated);
     }
 
@@ -126,6 +137,50 @@ public class AuditsWriter {
         atlasServerService.getCreateAtlasServer(getCurrentClusterName(), getCurrentClusterName());
     }
 
+    static class ReplKeyGuidFinder {
+        private static final String ENTITY_TYPE_HIVE_DB = "hive_db";
+        private static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
+        private static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
+
+        public static String get(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String candidateGuid) {
+            String guid = null;
+            try {
+                guid = getParentEntityGuid(typeRegistry, entityStore, candidateGuid);
+            } catch (AtlasBaseException e) {
+                LOG.error("Error fetching parent guid for child entity: {}", candidateGuid);
+            }
+
+            if (StringUtils.isEmpty(guid)) {
+                guid = candidateGuid;
+            }
+
+            return guid;
+        }
+
+        private static String getParentEntityGuid(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String defaultGuid) throws AtlasBaseException {
+            AtlasEntity.AtlasEntityWithExtInfo extInfo = entityStore.getById(defaultGuid);
+            if (extInfo == null || extInfo.getEntity() == null) {
+                return null;
+            }
+
+            String typeName = extInfo.getEntity().getTypeName();
+            if (!typeName.equals(ENTITY_TYPE_HIVE_TABLE) && !typeName.equals(ENTITY_TYPE_HIVE_COLUMN)) {
+                return null;
+            }
+
+            String hiveDBQualifiedName = extractHiveDBQualifiedName((String) extInfo.getEntity().getAttribute(EntityGraphRetriever.QUALIFIED_NAME));
+            AtlasEntityType entityType = typeRegistry.getEntityTypeByName(ENTITY_TYPE_HIVE_DB);
+            return entityStore.getGuidByUniqueAttributes(entityType, Collections.singletonMap(EntityGraphRetriever.QUALIFIED_NAME, hiveDBQualifiedName));
+        }
+
+        @VisibleForTesting
+        static String extractHiveDBQualifiedName(String qualifiedName) {
+            return String.format("%s@%s",
+                    StringUtils.substringBefore(qualifiedName, "."),
+                    StringUtils.substringAfter(qualifiedName, "@"));
+        }
+    }
+
     private class ExportAudits {
         private AtlasExportRequest request;
         private String targetServerName;
@@ -159,13 +214,11 @@ public class AuditsWriter {
         private AtlasImportRequest request;
         private boolean replicationOptionState;
         private String sourceServerName;
-        private String optionKeyReplicatedFrom;
         private String sourceServerFullName;
 
         public void add(String userName, AtlasImportResult result,
                         long startTime, long endTime,
                         List<String> entityGuids) throws AtlasBaseException {
-            optionKeyReplicatedFrom = AtlasImportRequest.OPTION_KEY_REPLICATED_FROM;
             request = result.getRequest();
             replicationOptionState = request.isReplicationOptionSet();
 
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 829390b..92d4fb0 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -33,6 +33,7 @@ import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.store.graph.v2.AtlasEntityChangeNotifier;
 import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasEntityType;
@@ -146,6 +147,14 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
         assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM);
     }
 
+    @Test
+    public void replKeyGuidFinder() {
+        String expectedDBQualifiedName = "largedb@cl1";
+
+        assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("largedb.testtable_0.col101@cl1"), expectedDBQualifiedName);
+        assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("largedb.testtable_0@cl1"), expectedDBQualifiedName);
+    }
+
     private void assertReplicationAttribute(String attrNameReplication) throws AtlasBaseException {
         pauseForIndexCreation();
         AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID));
--
libgit2 0.27.1