Commit 74bf4cda by Nikhil Bonte Committed by Ashutosh Mestry

ATLAS-3346: Support to re-activated deleted entity during import.

parent a6552f21
......@@ -959,9 +959,9 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
context.addEntityToDelete(vertex);
} else if (currStatus == Status.DELETED && newStatus == Status.ACTIVE) {
LOG.warn("attempt to activate deleted entity (guid={}). Ignored", guid);
entity.setStatus(currStatus);
LOG.warn("Import is attempting to activate deleted entity (guid={}).", guid);
entityGraphMapper.importActivateEntity(vertex, entity);
context.addCreated(guid, entity, entityType, vertex);
}
}
......
......@@ -42,6 +42,7 @@ import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.FullTextMapperV2;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
......@@ -2099,4 +2100,55 @@ public class EntityGraphMapper {
private static String getSoftRefFormattedString(String typeName, String resolvedGuid) {
return String.format(SOFT_REF_FORMAT, typeName, resolvedGuid);
}
public void importActivateEntity(AtlasVertex vertex, AtlasEntity entity) {
AtlasGraphUtilsV2.setEncodedProperty(vertex, STATE_PROPERTY_KEY, ACTIVE);
if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
Set<String> relatedEntitiesGuids = getRelatedEntitiesGuids(entity);
activateEntityRelationships(vertex, relatedEntitiesGuids);
}
}
private void activateEntityRelationships(AtlasVertex vertex, Set<String> relatedEntitiesGuids) {
Iterator<AtlasEdge> edgeIterator = vertex.getEdges(AtlasEdgeDirection.BOTH).iterator();
while (edgeIterator.hasNext()) {
AtlasEdge edge = edgeIterator.next();
if (AtlasGraphUtilsV2.getState(edge) != DELETED) {
continue;
}
final String relatedEntityGuid;
if (Objects.equals(edge.getInVertex().getId(), vertex.getId())) {
relatedEntityGuid = AtlasGraphUtilsV2.getIdFromVertex(edge.getOutVertex());
} else {
relatedEntityGuid = AtlasGraphUtilsV2.getIdFromVertex(edge.getInVertex());
}
if (StringUtils.isEmpty(relatedEntityGuid) || !relatedEntitiesGuids.contains(relatedEntityGuid)) {
continue;
}
edge.setProperty(STATE_PROPERTY_KEY, AtlasRelationship.Status.ACTIVE);
}
}
private Set<String> getRelatedEntitiesGuids(AtlasEntity entity) {
Set<String> relGuidsSet = new HashSet<>();
for (Object o : entity.getRelationshipAttributes().values()) {
if (o instanceof AtlasObjectId) {
relGuidsSet.add(((AtlasObjectId) o).getGuid());
} else if (o instanceof List) {
for (Object id : (List) o) {
if (id instanceof AtlasObjectId) {
relGuidsSet.add(((AtlasObjectId) id).getGuid());
}
}
}
}
return relGuidsSet;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.google.inject.Inject;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import static org.apache.atlas.AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
import static org.apache.atlas.type.AtlasTypeUtil.toAtlasRelatedObjectId;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ImportReactivateTableTest extends ExportImportTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ImportReactivateTableTest.class);
private static final String ENTITY_TYPE_COL = "hive_column";
private static final String ENTITY_GUID_TABLE_WITH_REL_ATTRS = "e19e5683-d9ae-436a-af1e-0873582d0f1e";
private static final String ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS = "027a987e-867a-4c98-ac1e-c5ded41130d3";
private static final String REPL_FROM = "cl1";
private static final String REPL_TRANSFORMER = "[{\"conditions\":{\"__entity\":\"topLevel: \"}," +
"\"action\":{\"__entity\":\"ADD_CLASSIFICATION: cl1_replicated\"}}," +
"{\"action\":{\"__entity.replicatedTo\":\"CLEAR:\",\"__entity.replicatedFrom\":\"CLEAR:\"}}," +
"{\"conditions\":{\"hive_db.clusterName\":\"EQUALS: cl1\"},\"action\":{\"hive_db.clusterName\":\"SET: cl2\"}}," +
"{\"conditions\":{\"hive_db.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
"\"action\":{\"hive_db.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}," +
"{\"conditions\":{\"hive_storagedesc.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
"\"action\":{\"hive_storagedesc.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}]";
@Inject
private ImportService importService;
@Inject
private AtlasTypeRegistry typeRegistry;
@Inject
private AtlasEntityStore entityStore;
@Inject
private AtlasTypeDefStore typeDefStore;
@BeforeTest
public void setup() throws IOException, AtlasBaseException {
RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
basicSetup(typeDefStore, typeRegistry);
}
@AfterClass
public void clear() throws Exception {
AtlasGraphProvider.cleanup();
if (useLocalSolr()) {
LocalSolrRunner.stop();
}
}
private void importSeedData() throws AtlasBaseException, IOException {
loadFsModel(typeDefStore, typeRegistry);
loadHiveModel(typeDefStore, typeRegistry);
AtlasImportRequest atlasImportRequest = getDefaultImportRequest();
atlasImportRequest.setOption("replicatedFrom", REPL_FROM);
atlasImportRequest.setOption("transformers", REPL_TRANSFORMER);
runImportWithParameters(importService, atlasImportRequest, getDataWithoutRelationshipAttrs());
runImportWithParameters(importService, atlasImportRequest, getDataWithRelationshipAttrs());
}
public static InputStream getDataWithRelationshipAttrs() {
return getInputStreamFrom("repl_exp_1.zip");
}
public static InputStream getDataWithoutRelationshipAttrs() {
return getInputStreamFrom("stocks.zip");
}
@Test
public void testWithRelationshipAttr() throws AtlasBaseException, IOException {
testReactivation(ENTITY_GUID_TABLE_WITH_REL_ATTRS, 2);
}
@Test
public void testWithoutRelationshipAttr() throws AtlasBaseException, IOException {
testReactivation(ENTITY_GUID_TABLE_WITHOUT_REL_ATTRS, 7);
}
public void testReactivation(String tableEntityGuid, int columnCount) throws AtlasBaseException, IOException {
importSeedData();
AtlasEntity.AtlasEntityWithExtInfo entity = entityStore.getById(tableEntityGuid);
EntityMutationResponse response = createColumn(entity.getEntity());
String columnGuid = response.getCreatedEntities().get(0).getGuid();
assertNotNull(columnGuid);
columnCount++;
entityStore.deleteById(tableEntityGuid);
entity = entityStore.getById(tableEntityGuid);
assertEquals(entity.getEntity().getStatus(), AtlasEntity.Status.DELETED);
importSeedData();
AtlasEntity atlasEntity = entityStore.getById(tableEntityGuid).getEntity();
assertEquals(atlasEntity.getStatus(), AtlasEntity.Status.ACTIVE);
List<AtlasRelatedObjectId> columns = (List<AtlasRelatedObjectId>) atlasEntity.getRelationshipAttribute("columns");
assertEquals(columns.size(), columnCount);
int activeColumnCount = 0;
int deletedColumnCount = 0;
for (AtlasRelatedObjectId column : columns) {
if (column.getGuid().equals(columnGuid)){
assertEquals(column.getEntityStatus(), AtlasEntity.Status.DELETED);
assertEquals(column.getRelationshipStatus(), AtlasRelationship.Status.DELETED);
deletedColumnCount++;
}else{
assertEquals(column.getEntityStatus(), AtlasEntity.Status.ACTIVE);
assertEquals(column.getRelationshipStatus(), AtlasRelationship.Status.ACTIVE);
activeColumnCount++;
}
}
assertEquals(activeColumnCount, --columnCount);
assertEquals(deletedColumnCount, 1);
}
private EntityMutationResponse createColumn(AtlasEntity tblEntity) throws AtlasBaseException {
AtlasEntity ret = new AtlasEntity(ENTITY_TYPE_COL);
String name = "new_column";
ret.setAttribute("name", name);
ret.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, name + REPL_FROM);
ret.setAttribute("type", "int");
ret.setAttribute("comment", name);
ret.setRelationshipAttribute("table", toAtlasRelatedObjectId(tblEntity));
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(ret), false);
return response;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment