Commit 0c308015 by Ashutosh Mestry

ATLAS-2818: Entity tagging after import.

parent ca33b1bf
......@@ -28,18 +28,15 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.inject.Inject;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -101,6 +98,11 @@ public class AuditsWriter {
private AtlasCluster saveCluster(String clusterName, String entityGuid, long lastModifiedTimestamp) throws AtlasBaseException {
AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
cluster.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp);
if (LOG.isDebugEnabled()) {
LOG.debug("saveCluster: {}", cluster);
}
return clusterService.save(cluster);
}
......@@ -116,68 +118,77 @@ public class AuditsWriter {
private class ExportAudits {
private AtlasExportRequest request;
private AtlasCluster cluster;
private String targetClusterName;
private String optionKeyReplicatedTo;
private boolean replicationOptionState;
public void add(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException {
public void add(String userName, AtlasExportResult result,
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
optionKeyReplicatedTo = AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
request = result.getRequest();
replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedTo);
targetClusterName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo);
cluster = saveCluster(getCurrentClusterName());
saveClusters();
auditService.add(userName, getCurrentClusterName(), targetClusterName,
ExportImportAuditEntry.OPERATION_EXPORT,
AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty());
updateReplicationAttributeForExport(request, entitityGuids);
}
AtlasType.toJson(result), startTime, endTime, !entityGuids.isEmpty());
private void updateReplicationAttributeForExport(AtlasExportRequest request, List<String> entityGuids) throws AtlasBaseException {
if(!replicationOptionState) {
if (result.getOperationStatus() == AtlasExportResult.OperationStatus.FAIL) {
return;
}
updateReplicationAttribute(replicationOptionState, targetClusterName,
entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, 0L);
entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, result.getLastModifiedTimestamp());
}
private void saveClusters() throws AtlasBaseException {
saveCluster(getCurrentClusterName());
targetClusterName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo);
if(StringUtils.isNotEmpty(targetClusterName)) {
saveCluster(targetClusterName);
}
}
}
private class ImportAudits {
private AtlasImportRequest request;
private boolean replicationOptionState;
private AtlasCluster cluster;
private String sourceClusterName;
private AtlasCluster sourceCluster;
private String optionKeyReplicatedFrom;
private AtlasImportResult result;
public void add(String userName, AtlasImportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException {
this.result = result;
request = result.getRequest();
public void add(String userName, AtlasImportResult result,
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
optionKeyReplicatedFrom = AtlasImportRequest.OPTION_KEY_REPLICATED_FROM;
request = result.getRequest();
replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedFrom);
String sourceCluster = getClusterNameFromOptionsState();
cluster = saveCluster(sourceCluster);
saveClusters();
auditService.add(userName,
sourceCluster, getCurrentClusterName(),
ExportImportAuditEntry.OPERATION_IMPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty());
sourceClusterName, getCurrentClusterName(),
ExportImportAuditEntry.OPERATION_IMPORT,
AtlasType.toJson(result), startTime, endTime, !entityGuids.isEmpty());
updateReplicationAttributeForImport(entitityGuids);
}
if(result.getOperationStatus() == AtlasImportResult.OperationStatus.FAIL) {
return;
}
private void updateReplicationAttributeForImport(List<String> entityGuids) throws AtlasBaseException {
if(!replicationOptionState) return;
updateReplicationAttribute(replicationOptionState, this.sourceClusterName, entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER, result.getExportResult().getLastModifiedTimestamp());
}
String targetClusterName = cluster.getName();
private void saveClusters() throws AtlasBaseException {
saveCluster(getCurrentClusterName());
updateReplicationAttribute(replicationOptionState, targetClusterName,
entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER,
result.getExportResult().getLastModifiedTimestamp());
sourceClusterName = getClusterNameFromOptionsState();
if(StringUtils.isNotEmpty(sourceClusterName)) {
this.sourceCluster = saveCluster(sourceClusterName);
}
}
private String getClusterNameFromOptionsState() {
......
......@@ -48,17 +48,21 @@ public class ImportService {
private final AtlasTypeDefStore typeDefStore;
private final AtlasTypeRegistry typeRegistry;
private final BulkImporter bulkImporter;
private AuditsWriter auditsWriter;
private final AuditsWriter auditsWriter;
private final ImportTransformsShaper importTransformsShaper;
private long startTimestamp;
private long endTimestamp;
@Inject
public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter, AuditsWriter auditsWriter) {
public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter,
AuditsWriter auditsWriter,
ImportTransformsShaper importTransformsShaper) {
this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry;
this.bulkImporter = bulkImporter;
this.auditsWriter = auditsWriter;
this.importTransformsShaper = importTransformsShaper;
}
public AtlasImportResult run(ZipSource source, String userName,
......@@ -76,7 +80,7 @@ public class ImportService {
AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis());
try {
LOG.info("==> import(user={}, from={})", userName, requestingIP);
LOG.info("==> import(user={}, from={}, request={})", userName, requestingIP, request);
String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null;
......@@ -85,8 +89,6 @@ public class ImportService {
processTypes(source.getTypesDef(), result);
setStartPosition(request, source);
processEntities(userName, source, result);
result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
} catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp);
......@@ -110,12 +112,13 @@ public class ImportService {
return;
}
updateTransformsWithSubTypes(importTransform);
source.setImportTransform(importTransform);
importTransformsShaper.shape(importTransform);
source.setImportTransform(importTransform);
if(LOG.isDebugEnabled()) {
debugLog(" => transforms: {}", AtlasType.toJson(importTransform));
}
}
private void debugLog(String s, Object... params) {
......@@ -124,19 +127,6 @@ public class ImportService {
LOG.debug(s, params);
}
private void updateTransformsWithSubTypes(ImportTransforms importTransforms) throws AtlasBaseException {
String[] transformTypes = importTransforms.getTypes().toArray(new String[importTransforms.getTypes().size()]);
for (int i = 0; i < transformTypes.length; i++) {
String typeName = transformTypes[i];
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
continue;
}
importTransforms.addParentTransformsToSubTypes(typeName, entityType.getAllSubTypes());
}
}
private void setStartPosition(AtlasImportRequest request, ZipSource source) throws AtlasBaseException {
if (request.getStartGuid() != null) {
source.setPositionUsingEntityGuid(request.getStartGuid());
......@@ -201,6 +191,8 @@ public class ImportService {
endTimestamp = System.currentTimeMillis();
result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp));
result.setExportResult(importSource.getExportResult());
result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder());
}
......
......@@ -35,6 +35,7 @@ public abstract class ImportTransformer {
private static final String TRANSFORMER_NAME_LOWERCASE = "lowercase";
private static final String TRANSFORMER_NAME_UPPERCASE = "uppercase";
private static final String TRANSFORMER_NAME_REMOVE_CLASSIFICATION = "removeClassification";
private static final String TRANSFORMER_NAME_ADD_CLASSIFICATION = "addClassification";
private static final String TRANSFORMER_NAME_REPLACE = "replace";
private static final String TRANSFORMER_SET_DELETED = "setDeleted";
......@@ -69,6 +70,9 @@ public abstract class ImportTransformer {
ret = new ClearAttributes(name);
} else if (key.equals(TRANSFORMER_SET_DELETED)) {
ret = new SetDeleted();
} else if (key.equals(TRANSFORMER_NAME_ADD_CLASSIFICATION)) {
String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length);
ret = new AddClassification(name);
} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec);
}
......@@ -146,6 +150,46 @@ public abstract class ImportTransformer {
}
}
static class AddClassification extends ImportTransformer {
private final String classificationName;
public AddClassification(String name) {
super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION);
this.classificationName = name;
}
@Override
public Object apply(Object o) {
if (!(o instanceof AtlasEntity)) {
return o;
}
AtlasEntity entity = (AtlasEntity) o;
if(entity.getClassifications() == null) {
entity.setClassifications(new ArrayList<AtlasClassification>());
}
for (AtlasClassification c : entity.getClassifications()) {
if (c.getTypeName().equals(classificationName)) {
return entity;
}
}
entity.getClassifications().add(new AtlasClassification(classificationName));
return entity;
}
@Override
public String toString() {
return String.format("%s=%s", "AddClassification", classificationName);
}
public String getClassificationName() {
return classificationName;
}
}
static class RemoveClassification extends ImportTransformer {
private final String classificationToBeRemoved;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@Component
public class ImportTransformsShaper {
private static final Logger LOG = LoggerFactory.getLogger(ImportTransformsShaper.class);
private final AtlasTypeRegistry typeRegistry;
private final AtlasTypeDefStore typeDefStore;
@Inject
public ImportTransformsShaper(AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore) {
this.typeRegistry = typeRegistry;
this.typeDefStore = typeDefStore;
}
public void shape(ImportTransforms importTransform) throws AtlasBaseException {
getCreateClassifications(importTransform);
updateTransformsWithSubTypes(importTransform);
}
private void getCreateClassifications(ImportTransforms importTransform) throws AtlasBaseException {
Map<String, Map<String, List<ImportTransformer>>> mapMapList = importTransform.getTransforms();
for (Map<String, List<ImportTransformer>> mapList : mapMapList.values()) {
for (List<ImportTransformer> list : mapList.values()) {
for (ImportTransformer importTransformer : list) {
if((importTransformer instanceof ImportTransformer.AddClassification)) {
ImportTransformer.AddClassification addClassification = (ImportTransformer.AddClassification) importTransformer;
getCreateTag(addClassification.getClassificationName());
}
}
}
}
}
private void updateTransformsWithSubTypes(ImportTransforms importTransforms) {
String[] transformTypes = importTransforms.getTypes().toArray(new String[importTransforms.getTypes().size()]);
for (int i = 0; i < transformTypes.length; i++) {
String typeName = transformTypes[i];
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType == null) {
continue;
}
importTransforms.addParentTransformsToSubTypes(typeName, entityType.getAllSubTypes());
}
}
private String getCreateTag(String classificationName) throws AtlasBaseException {
AtlasClassificationDef classificationDef = typeRegistry.getClassificationDefByName(classificationName);
if(classificationDef != null) {
return classificationName;
}
classificationDef = new AtlasClassificationDef(classificationName);
AtlasTypesDef typesDef = new AtlasTypesDef();
typesDef.setClassificationDefs(Collections.singletonList(classificationDef));
typeDefStore.createTypesDef(typesDef);
LOG.info("created classification: {}", classificationName);
return classificationName;
}
}
......@@ -132,7 +132,7 @@ public class BulkImporterImpl implements BulkImporter {
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
}
@VisibleForTesting
......@@ -140,12 +140,13 @@ public class BulkImporterImpl implements BulkImporter {
final double tolerance = 0.000001;
final int MAX_PERCENT = 100;
float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize);
int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex;
float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < streamSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
if (updateLog) {
log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), maxSize, additionalInfo);
}
return updatedPercent;
......
......@@ -355,7 +355,7 @@ public class ImportServiceTest extends ExportImportTestBase {
@Test
public void importServiceProcessesIOException() {
ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null);
ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null,null);
AtlasImportRequest req = mock(AtlasImportRequest.class);
Answer<Map> answer = invocationOnMock -> {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.util.List;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ImportTransformsShaperTest extends ExportImportTestBase {
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private ImportService importService;
@Inject
private AtlasEntityStore entityStore;
private final String TAG_NAME = "REPLICATED";
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry);
loadFsModel(typeDefStore, typeRegistry);
}
@Test
public void newTagIsCreatedAndEntitiesAreTagged() throws AtlasBaseException, IOException {
AtlasImportResult result = ZipFileResourceTestUtils.runImportWithParameters(importService,
getImporRequest(),
ZipFileResourceTestUtils.getZipSourceFrom("stocks.zip"));
AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(TAG_NAME);
assertNotNull(classification);
assertEntities(result.getProcessedEntities(), TAG_NAME);
}
private void assertEntities(List<String> entityGuids, String tagName) throws AtlasBaseException {
for (String guid : entityGuids) {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = this.entityStore.getById(guid);
assertNotNull(entityWithExtInfo);
assertTag(entityWithExtInfo, tagName);
}
}
private void assertTag(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, String tagName) {
if(entityWithExtInfo.getReferredEntities() == null || entityWithExtInfo.getReferredEntities().size() == 0) {
return;
}
for (AtlasEntity entity : entityWithExtInfo.getReferredEntities().values()) {
assertTag(entity, tagName);
}
}
private void assertTag(AtlasEntity entity, String tagName) {
assertTrue(entity.getClassifications().size() > 0,
String.format("%s not tagged", entity.getTypeName()));
assertEquals(entity.getClassifications().get(0).getTypeName(), tagName);
}
private AtlasImportRequest getImporRequest() {
AtlasImportRequest request = new AtlasImportRequest();
request.getOptions().put("transforms", "{ \"Referenceable\": { \"*\":[ \"addClassification:REPLICATED\" ] } }");
return request;
}
}
......@@ -46,6 +46,8 @@ public class ImportTransformsTest {
private final String jsonSingleClearAttrValue = "{ \"hive_table\": { \"*\":[ \"clearAttrValue:replicatedToCluster\", \"clearAttrValue:replicatedFromCluster\" ] } }";
private final String jsonMultipleClearAttrValue = "{ \"hive_table\": { \"*\":[ \"clearAttrValue:replicatedToCluster,replicatedFromCluster\" ] } }";
private final String jsonSetDeleted = "{ \"hive_table\": { \"*\":[ \"setDeleted\" ] } }";
private final String jsonAddClasification = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED\" ] } }";
private final String jsonAddClasification2 = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED_2\" ] } }";
private ImportTransforms transform;
private String HIVE_TABLE_ATTR_SYNC_INFO = "hive_table.syncInfo";
......@@ -185,6 +187,36 @@ public class ImportTransformsTest {
assertNotNull(t);
}
@Test
public void addClassification_AddsClassificationToEntitiy() throws AtlasBaseException {
AtlasEntity entity = getHiveTableAtlasEntity();
int existingClassificationsCount = entity.getClassifications() != null ? entity.getClassifications().size() : 0;
ImportTransforms t = ImportTransforms.fromJson(jsonAddClasification);
assertTrue(t.getTransforms().size() > 0);
t.apply(entity);
assertNotNull(t);
assertEquals(entity.getClassifications().size(), existingClassificationsCount + 1);
addClassification_ExistingClassificationsAreHandled(entity);
addClassification_MultipleClassificationsAreAdded(entity);
}
private void addClassification_ExistingClassificationsAreHandled(AtlasEntity entity) throws AtlasBaseException {
int existingClassificationsCount = entity.getClassifications() != null ? entity.getClassifications().size() : 0;
assertTrue(existingClassificationsCount > 0);
ImportTransforms.fromJson(jsonAddClasification).apply(entity);
assertEquals(entity.getClassifications().size(), existingClassificationsCount);
}
private void addClassification_MultipleClassificationsAreAdded(AtlasEntity entity) throws AtlasBaseException {
int existingClassificationsCount = entity.getClassifications().size();
ImportTransforms.fromJson(jsonAddClasification2).apply(entity);
assertEquals(entity.getClassifications().size(), existingClassificationsCount + 1);
}
private String[] getExtEntityExpectedValues(AtlasEntityWithExtInfo entityWithExtInfo) {
String[] ret = new String[entityWithExtInfo.getReferredEntities().size()];
......
......@@ -35,6 +35,7 @@ import static org.testng.Assert.assertTrue;
public class AtlasEntityStoreV2BulkImportPercentTest {
private final int MAX_PERCENT = 100;
private final float MAX_PERCENT_FLOAT = 100.0F;
private List<Integer> percentHolder;
private Logger log;
......@@ -143,6 +144,15 @@ public class AtlasEntityStoreV2BulkImportPercentTest {
assertEqualsForPercentHolder(expected);
}
@Test
public void exceedingInitialStreamSize_KeepsPercentAt100() throws Exception {
runWithSize(4);
double[] expected = fillPercentHolderWith100();
float f = BulkImporterImpl.updateImportProgress(log, 5, 4, 100, "additional info");
assertTrue((f - MAX_PERCENT_FLOAT) <= 0.0001);
}
private void runWithSize(int streamSize) throws Exception {
float currentPercent = 0;
setupPercentHolder(streamSize);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment