Commit 765ea583 by Ashutosh Mestry

ATLAS-3320: Migration Import implementation.

parent 3d0fcedb
...@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE ...@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
} }
} }
janusGraph = (StandardJanusGraph) AtlasJanusGraphDatabase.getGraphInstance(); janusGraph = (StandardJanusGraph) graphInstance;
} }
@Override @Override
......
...@@ -44,6 +44,9 @@ public class AtlasImportRequest implements Serializable { ...@@ -44,6 +44,9 @@ public class AtlasImportRequest implements Serializable {
public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers"; public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
public static final String OPTION_KEY_MIGRATION = "migration";
public static final String OPTION_KEY_NUM_WORKERS = "numWorkers";
public static final String OPTION_KEY_BATCH_SIZE = "batchSize";
public static final String OPTION_KEY_FORMAT = "format"; public static final String OPTION_KEY_FORMAT = "format";
public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect"; public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
private static final String START_POSITION_KEY = "startPosition"; private static final String START_POSITION_KEY = "startPosition";
...@@ -124,6 +127,24 @@ public class AtlasImportRequest implements Serializable { ...@@ -124,6 +127,24 @@ public class AtlasImportRequest implements Serializable {
return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY; return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;
} }
@JsonIgnore
public int getOptionKeyNumWorkers() {
return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1);
}
@JsonIgnore
public int getOptionKeyBatchSize() {
return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1);
}
private int getOptionsValue(String optionKeyBatchSize, int defaultValue) {
String optionsValue = getOptionForKey(optionKeyBatchSize);
return StringUtils.isEmpty(optionsValue) ?
defaultValue :
Integer.valueOf(optionsValue);
}
@JsonAnySetter @JsonAnySetter
public void setOption(String key, String value) { public void setOption(String key, String value) {
if (null == options) { if (null == options) {
......
...@@ -199,6 +199,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -199,6 +199,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
return cache.get(guid); return cache.get(guid);
} }
public static void clearCache() {
guidVertexCache.get().clear();
}
boolean logException(Throwable t) { boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) { if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode(); Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
......
...@@ -53,7 +53,7 @@ import java.util.Set; ...@@ -53,7 +53,7 @@ import java.util.Set;
@Component @Component
public class FullTextMapperV2 { public class FullTextMapperV2 implements IFullTextMapper {
private static final Logger LOG = LoggerFactory.getLogger(FullTextMapperV2.class); private static final Logger LOG = LoggerFactory.getLogger(FullTextMapperV2.class);
private static final String FULL_TEXT_DELIMITER = " "; private static final String FULL_TEXT_DELIMITER = " ";
...@@ -84,6 +84,8 @@ public class FullTextMapperV2 { ...@@ -84,6 +84,8 @@ public class FullTextMapperV2 {
* @return Full text string ONLY for the added classifications * @return Full text string ONLY for the added classifications
* @throws AtlasBaseException * @throws AtlasBaseException
*/ */
@Override
public String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException { public String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
String ret = null; String ret = null;
final AtlasEntityWithExtInfo entityWithExtInfo; final AtlasEntityWithExtInfo entityWithExtInfo;
...@@ -120,6 +122,7 @@ public class FullTextMapperV2 { ...@@ -120,6 +122,7 @@ public class FullTextMapperV2 {
return ret; return ret;
} }
@Override
public String getIndexTextForEntity(String guid) throws AtlasBaseException { public String getIndexTextForEntity(String guid) throws AtlasBaseException {
String ret = null; String ret = null;
final AtlasEntity entity; final AtlasEntity entity;
...@@ -150,6 +153,7 @@ public class FullTextMapperV2 { ...@@ -150,6 +153,7 @@ public class FullTextMapperV2 {
return ret; return ret;
} }
@Override
public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException { public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException {
String ret = null; String ret = null;
...@@ -271,10 +275,12 @@ public class FullTextMapperV2 { ...@@ -271,10 +275,12 @@ public class FullTextMapperV2 {
} }
} }
@Override
public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException { public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
return getAndCacheEntity(guid, true); return getAndCacheEntity(guid, true);
} }
@Override
public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException { public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException {
RequestContext context = RequestContext.get(); RequestContext context = RequestContext.get();
AtlasEntity entity = context.getEntity(guid); AtlasEntity entity = context.getEntity(guid);
...@@ -294,6 +300,7 @@ public class FullTextMapperV2 { ...@@ -294,6 +300,7 @@ public class FullTextMapperV2 {
return entity; return entity;
} }
@Override
public AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException { public AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException {
RequestContext context = RequestContext.get(); RequestContext context = RequestContext.get();
AtlasEntityWithExtInfo entityWithExtInfo = context.getEntityWithExtInfo(guid); AtlasEntityWithExtInfo entityWithExtInfo = context.getEntityWithExtInfo(guid);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graph;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import java.util.List;
public interface IFullTextMapper {
/**
* Map newly associated/defined classifications for the entity with given GUID
* @param guid Entity guid
* @param classifications new classifications added to the entity
* @return Full text string ONLY for the added classifications
* @throws AtlasBaseException
*/
String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException;
String getIndexTextForEntity(String guid) throws AtlasBaseException;
String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException;
AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException;
AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException;
AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException;
}
...@@ -250,8 +250,9 @@ public class ImportService { ...@@ -250,8 +250,9 @@ public class ImportService {
private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException { private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
try { try {
if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) && if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) { request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT))) {
LOG.info("ZipSource Format: ZipDirect: Size: {}", request.getOptions().get("size"));
return getZipDirectEntityImportStream(request, inputStream); return getZipDirectEntityImportStream(request, inputStream);
} }
......
...@@ -64,6 +64,10 @@ public class ZipSourceDirect implements EntityImportStream { ...@@ -64,6 +64,10 @@ public class ZipSourceDirect implements EntityImportStream {
this.zipInputStream = new ZipInputStream(inputStream); this.zipInputStream = new ZipInputStream(inputStream);
this.streamSize = streamSize; this.streamSize = streamSize;
prepareStreamForFetch(); prepareStreamForFetch();
if (this.streamSize == 1) {
LOG.info("ZipSourceDirect: Stream Size set to: {}. This will cause inaccurate percentage reporting.", this.streamSize);
}
} }
@Override @Override
...@@ -226,6 +230,10 @@ public class ZipSourceDirect implements EntityImportStream { ...@@ -226,6 +230,10 @@ public class ZipSourceDirect implements EntityImportStream {
} }
public int size() { public int size() {
if (this.streamSize == 1) {
return currentPosition;
}
return this.streamSize; return this.streamSize;
} }
......
...@@ -24,6 +24,8 @@ import org.apache.atlas.RequestContext; ...@@ -24,6 +24,8 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -32,11 +34,20 @@ import java.io.FileInputStream; ...@@ -32,11 +34,20 @@ import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.Map;
import java.util.zip.ZipFile;
public class ZipFileMigrationImporter implements Runnable { public class ZipFileMigrationImporter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class); private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
private static String ENV_USER_NAME = "user.name"; private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
private static final String DEFAULT_NUMBER_OF_WORKERS = "4";
private static final String DEFAULT_BATCH_SIZE = "100";
private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
private static final String ZIP_FILE_COMMENT_TOTAL_COUNT = "total";
private final static String ENV_USER_NAME = "user.name";
private final ImportService importService; private final ImportService importService;
private final String fileToImport; private final String fileToImport;
...@@ -52,7 +63,8 @@ public class ZipFileMigrationImporter implements Runnable { ...@@ -52,7 +63,8 @@ public class ZipFileMigrationImporter implements Runnable {
FileWatcher fileWatcher = new FileWatcher(fileToImport); FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start(); fileWatcher.start();
performImport(new FileInputStream(new File(fileToImport))); int streamSize = getStreamSizeFromComment(fileToImport);
performImport(new FileInputStream(new File(fileToImport)), streamSize);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Migration Import: IO Error!", e); LOG.error("Migration Import: IO Error!", e);
} catch (AtlasBaseException e) { } catch (AtlasBaseException e) {
...@@ -60,19 +72,46 @@ public class ZipFileMigrationImporter implements Runnable { ...@@ -60,19 +72,46 @@ public class ZipFileMigrationImporter implements Runnable {
} }
} }
private void performImport(InputStream fs) throws AtlasBaseException { private int getStreamSizeFromComment(String fileToImport) {
int ret = 1;
try {
ZipFile zipFile = new ZipFile(fileToImport);
String comment = zipFile.getComment();
ret = processZipFileStreamSizeComment(comment);
zipFile.close();
} catch (IOException e) {
LOG.error("Error opening ZIP file: {}", fileToImport, e);
}
return ret;
}
private int processZipFileStreamSizeComment(String comment) {
if (StringUtils.isEmpty(comment)) {
return 1;
}
Map map = AtlasType.fromJson(comment, Map.class);
int entitiesCount = (int) map.get(ZIP_FILE_COMMENT_ENTITIES_COUNT);
int totalCount = (int) map.get(ZIP_FILE_COMMENT_TOTAL_COUNT);
LOG.info("ZipFileMigrationImporter: Zip file: Comment: streamSize: {}: total: {}", entitiesCount, totalCount);
return entitiesCount;
}
private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
try { try {
LOG.info("Migration Import: {}: Starting...", fileToImport); LOG.info("Migration Import: {}: Starting...", fileToImport);
RequestContext.get().setUser(getUserNameFromEnvironment(), null); RequestContext.get().setUser(getUserNameFromEnvironment(), null);
importService.run(fs, getImportRequest(), importService.run(fs, getImportRequest(streamSize),
getUserNameFromEnvironment(), getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(), InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress()); InetAddress.getLocalHost().getHostAddress());
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error loading zip for migration", ex); LOG.error("Migration Import: Error loading zip for migration!", ex);
throw new AtlasBaseException(ex); throw new AtlasBaseException(ex);
} finally { } finally {
LOG.info("Migration Import: {}: Done!", fileToImport); LOG.info("Migration Import: {}: Done!", fileToImport);
...@@ -83,9 +122,17 @@ public class ZipFileMigrationImporter implements Runnable { ...@@ -83,9 +122,17 @@ public class ZipFileMigrationImporter implements Runnable {
return System.getProperty(ENV_USER_NAME); return System.getProperty(ENV_USER_NAME);
} }
private AtlasImportRequest getImportRequest() throws AtlasException { private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
AtlasImportRequest request = new AtlasImportRequest(); AtlasImportRequest request = new AtlasImportRequest();
request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
request.setSizeOption(streamSize);
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS));
request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
return request; return request;
} }
private String getPropertyValue(String property, String defaultValue) throws AtlasException {
return ApplicationProperties.get().getString(property, defaultValue);
}
} }
...@@ -150,6 +150,14 @@ public interface AtlasEntityStore { ...@@ -150,6 +150,14 @@ public interface AtlasEntityStore {
EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException; EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException;
/** /**
* Create or update entities with parameters necessary for import process without commit. Caller will have to do take care of commit.
* @param entityStream AtlasEntityStream
* @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
* @throws AtlasBaseException
*/
EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException;
/**
* Update a single entity * Update a single entity
* @param objectId ID of the entity * @param objectId ID of the entity
* @param updatedEntityInfo updated entity information * @param updatedEntityInfo updated entity information
......
...@@ -66,7 +66,7 @@ import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY; ...@@ -66,7 +66,7 @@ import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
@Component @Component
public class AtlasEntityChangeNotifier { public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
private final Set<EntityChangeListener> entityChangeListeners; private final Set<EntityChangeListener> entityChangeListeners;
...@@ -91,6 +91,7 @@ public class AtlasEntityChangeNotifier { ...@@ -91,6 +91,7 @@ public class AtlasEntityChangeNotifier {
this.isV2EntityNotificationEnabled = AtlasRepositoryConfiguration.isV2EntityNotificationEnabled(); this.isV2EntityNotificationEnabled = AtlasRepositoryConfiguration.isV2EntityNotificationEnabled();
} }
@Override
public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException { public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners)) { if (CollectionUtils.isEmpty(entityChangeListeners)) {
return; return;
...@@ -119,6 +120,7 @@ public class AtlasEntityChangeNotifier { ...@@ -119,6 +120,7 @@ public class AtlasEntityChangeNotifier {
notifyPropagatedEntities(); notifyPropagatedEntities();
} }
@Override
public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException { public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners)) { if (CollectionUtils.isEmpty(entityChangeListeners)) {
return; return;
...@@ -137,6 +139,7 @@ public class AtlasEntityChangeNotifier { ...@@ -137,6 +139,7 @@ public class AtlasEntityChangeNotifier {
} }
} }
@Override
public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException { public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) { if (isV2EntityNotificationEnabled) {
doFullTextMapping(entity.getGuid()); doFullTextMapping(entity.getGuid());
...@@ -166,6 +169,7 @@ public class AtlasEntityChangeNotifier { ...@@ -166,6 +169,7 @@ public class AtlasEntityChangeNotifier {
} }
} }
@Override
public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException { public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) { if (isV2EntityNotificationEnabled) {
doFullTextMappingHelper(entities); doFullTextMappingHelper(entities);
...@@ -201,6 +205,7 @@ public class AtlasEntityChangeNotifier { ...@@ -201,6 +205,7 @@ public class AtlasEntityChangeNotifier {
} }
} }
@Override
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException { public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid()); doFullTextMapping(entity.getGuid());
...@@ -228,6 +233,7 @@ public class AtlasEntityChangeNotifier { ...@@ -228,6 +233,7 @@ public class AtlasEntityChangeNotifier {
} }
} }
@Override
public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException { public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid()); doFullTextMapping(entity.getGuid());
...@@ -255,6 +261,7 @@ public class AtlasEntityChangeNotifier { ...@@ -255,6 +261,7 @@ public class AtlasEntityChangeNotifier {
} }
} }
@Override
public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException { public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
doFullTextMappingHelper(entities); doFullTextMappingHelper(entities);
...@@ -288,6 +295,7 @@ public class AtlasEntityChangeNotifier { ...@@ -288,6 +295,7 @@ public class AtlasEntityChangeNotifier {
} }
} }
@Override
public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity association only if v2 notifications are enabled // listeners notified on term-entity association only if v2 notifications are enabled
if (isV2EntityNotificationEnabled) { if (isV2EntityNotificationEnabled) {
...@@ -307,6 +315,7 @@ public class AtlasEntityChangeNotifier { ...@@ -307,6 +315,7 @@ public class AtlasEntityChangeNotifier {
} }
} }
@Override
public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException { public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity disassociation only if v2 notifications are enabled // listeners notified on term-entity disassociation only if v2 notifications are enabled
if (isV2EntityNotificationEnabled) { if (isV2EntityNotificationEnabled) {
...@@ -326,6 +335,7 @@ public class AtlasEntityChangeNotifier { ...@@ -326,6 +335,7 @@ public class AtlasEntityChangeNotifier {
} }
} }
@Override
public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException { public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
doFullTextMapping(entityGuid); doFullTextMapping(entityGuid);
...@@ -339,6 +349,7 @@ public class AtlasEntityChangeNotifier { ...@@ -339,6 +349,7 @@ public class AtlasEntityChangeNotifier {
} }
} }
@Override
public void notifyPropagatedEntities() throws AtlasBaseException { public void notifyPropagatedEntities() throws AtlasBaseException {
RequestContext context = RequestContext.get(); RequestContext context = RequestContext.get();
Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations(); Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations();
......
...@@ -90,13 +90,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -90,13 +90,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
private final DeleteHandlerDelegate deleteDelegate; private final DeleteHandlerDelegate deleteDelegate;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final AtlasEntityChangeNotifier entityChangeNotifier; private final IAtlasEntityChangeNotifier entityChangeNotifier;
private final EntityGraphMapper entityGraphMapper; private final EntityGraphMapper entityGraphMapper;
private final EntityGraphRetriever entityRetriever; private final EntityGraphRetriever entityRetriever;
@Inject @Inject
public AtlasEntityStoreV2(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, public AtlasEntityStoreV2(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry,
AtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) { IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) {
this.deleteDelegate = deleteDelegate; this.deleteDelegate = deleteDelegate;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.entityChangeNotifier = entityChangeNotifier; this.entityChangeNotifier = entityChangeNotifier;
...@@ -332,6 +332,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { ...@@ -332,6 +332,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
} }
@Override @Override
public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException {
return createOrUpdate(entityStream, false, true, true);
}
@Override
@GraphTransaction @GraphTransaction
public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException { public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
......
...@@ -99,10 +99,10 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore { ...@@ -99,10 +99,10 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private final EntityGraphRetriever entityRetriever; private final EntityGraphRetriever entityRetriever;
private final DeleteHandlerDelegate deleteDelegate; private final DeleteHandlerDelegate deleteDelegate;
private final GraphHelper graphHelper = GraphHelper.getInstance(); private final GraphHelper graphHelper = GraphHelper.getInstance();
private final AtlasEntityChangeNotifier entityChangeNotifier; private final IAtlasEntityChangeNotifier entityChangeNotifier;
@Inject @Inject
public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, AtlasEntityChangeNotifier entityChangeNotifier) { public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerDelegate deleteDelegate, IAtlasEntityChangeNotifier entityChangeNotifier) {
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.entityRetriever = new EntityGraphRetriever(typeRegistry); this.entityRetriever = new EntityGraphRetriever(typeRegistry);
this.deleteDelegate = deleteDelegate; this.deleteDelegate = deleteDelegate;
......
...@@ -18,33 +18,29 @@ ...@@ -18,33 +18,29 @@
package org.apache.atlas.repository.store.graph.v2; package org.apache.atlas.repository.store.graph.v2;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.Constants;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
...@@ -55,131 +51,27 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -55,131 +51,27 @@ public class BulkImporterImpl implements BulkImporter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
private final AtlasEntityStore entityStore; private final AtlasEntityStore entityStore;
private final EntityGraphRetriever entityGraphRetriever;
private AtlasTypeRegistry typeRegistry; private AtlasTypeRegistry typeRegistry;
private final int MAX_ATTEMPTS = 2;
private boolean directoryBasedImportConfigured;
@Inject @Inject
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore; this.entityStore = entityStore;
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
} }
@Override @Override
public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) { ImportStrategy importStrategy = null;
LOG.debug("==> bulkImport()");
}
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<>());
Set<String> processedGuids = new HashSet<>();
float currentPercent = 0f;
List<String> residualList = new ArrayList<>();
EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
while (entityImportStreamWithResidualList.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null) {
continue;
}
for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
try {
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
}
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, if (importResult.getRequest().getOptions() != null &&
entityStream.getPosition(), importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) {
entityImportStreamWithResidualList.getStreamSize(), importStrategy = new MigrationImport(new AtlasGraphProvider(), this.typeRegistry);
currentPercent); } else {
importStrategy = new RegularImport(this.entityStore, this.typeRegistry);
entityStream.onImportComplete(entity.getGuid());
break;
} catch (AtlasBaseException e) {
if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw e;
}
break;
} catch (AtlasSchemaViolationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity: {}", entity.getGuid(), e);
}
if (attempt == 0) {
updateVertexGuid(entity);
} else {
LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid());
throw e;
}
} catch (Throwable e) {
AtlasBaseException abe = new AtlasBaseException(e);
if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw abe;
}
LOG.warn("Exception: {}", entity.getGuid(), e);
break;
} finally {
RequestContext.get().clearCache();
}
}
}
importResult.getProcessedEntities().addAll(processedGuids);
LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
return ret;
}
@GraphTransaction
public void updateVertexGuid(AtlasEntity entity) {
String entityGuid = entity.getGuid();
AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String vertexGuid = null;
try {
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
} catch (AtlasBaseException e) {
LOG.warn("Entity: {}: Does not exist!", objectId);
return;
}
if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
return;
}
AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
if (v == null) {
return;
} }
addHistoricalGuid(v, vertexGuid); LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName());
AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid); return importStrategy.run(entityStream, importResult);
LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
}
private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
} }
@VisibleForTesting @VisibleForTesting
...@@ -193,38 +85,16 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -193,38 +85,16 @@ public class BulkImporterImpl implements BulkImporter {
return json; return json;
} }
private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
return false;
}
lineageList.add(guid);
return true;
}
private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
EntityMutationResponse resp,
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
if (!directoryBasedImportConfigured) {
updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
}
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
}
@VisibleForTesting @VisibleForTesting
static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) { public static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
final double tolerance = 0.000001; final double tolerance = 0.000001;
final int MAX_PERCENT = 100; final int MAX_PERCENT = 100;
int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex; int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex;
if (maxSize <= 0) {
return currentPercent;
}
float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize); float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance; boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent); float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
...@@ -236,7 +106,7 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -236,7 +106,7 @@ public class BulkImporterImpl implements BulkImporter {
return updatedPercent; return updatedPercent;
} }
private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { public static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) { if (list == null) {
return; return;
} }
...@@ -251,41 +121,37 @@ public class BulkImporterImpl implements BulkImporter { ...@@ -251,41 +121,37 @@ public class BulkImporterImpl implements BulkImporter {
} }
} }
private static class EntityImportStreamWithResidualList { public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) {
private final EntityImportStream stream; String entityGuid = entity.getGuid();
private final List<String> residualList; AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
private boolean navigateResidualList;
private int currentResidualListIndex;
public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) { AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
this.stream = stream; String vertexGuid = null;
this.residualList = residualList; try {
this.navigateResidualList = false; vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
this.currentResidualListIndex = 0; } catch (AtlasBaseException e) {
LOG.warn("Entity: {}: Does not exist!", objectId);
return;
} }
public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() { if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
if (navigateResidualList == false) { return;
return stream.getNextEntityWithExtInfo();
} else {
stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
return stream.getNextEntityWithExtInfo();
}
} }
public boolean hasNext() { AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
if (!navigateResidualList) { if (v == null) {
boolean streamHasNext = stream.hasNext(); return;
navigateResidualList = (streamHasNext == false);
return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
} else {
return (currentResidualListIndex < residualList.size());
}
} }
public int getStreamSize() { addHistoricalGuid(v, vertexGuid);
return stream.size() + residualList.size(); AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
}
LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
}
public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
} }
} }
...@@ -39,7 +39,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali ...@@ -39,7 +39,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.FullTextMapperV2; import org.apache.atlas.repository.graph.IFullTextMapper;
import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
...@@ -132,15 +132,15 @@ public class EntityGraphMapper { ...@@ -132,15 +132,15 @@ public class EntityGraphMapper {
private final DeleteHandlerDelegate deleteDelegate; private final DeleteHandlerDelegate deleteDelegate;
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final AtlasRelationshipStore relationshipStore; private final AtlasRelationshipStore relationshipStore;
private final AtlasEntityChangeNotifier entityChangeNotifier; private final IAtlasEntityChangeNotifier entityChangeNotifier;
private final AtlasInstanceConverter instanceConverter; private final AtlasInstanceConverter instanceConverter;
private final EntityGraphRetriever entityRetriever; private final EntityGraphRetriever entityRetriever;
private final FullTextMapperV2 fullTextMapperV2; private final IFullTextMapper fullTextMapperV2;
@Inject @Inject
public EntityGraphMapper(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, public EntityGraphMapper(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
AtlasRelationshipStore relationshipStore, AtlasEntityChangeNotifier entityChangeNotifier, AtlasRelationshipStore relationshipStore, IAtlasEntityChangeNotifier entityChangeNotifier,
AtlasInstanceConverter instanceConverter, FullTextMapperV2 fullTextMapperV2) { AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2) {
this.deleteDelegate = deleteDelegate; this.deleteDelegate = deleteDelegate;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.graph = atlasGraph; this.graph = atlasGraph;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.EntityNotification;
import java.util.List;
import java.util.Set;
public interface IAtlasEntityChangeNotifier {
void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException;
void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException;
void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException;
void notifyPropagatedEntities() throws AtlasBaseException;
void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
import java.util.List;
import java.util.Set;
public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier {
@Override
public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
}
@Override
public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
}
@Override
public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
}
@Override
public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
}
@Override
public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
}
@Override
public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
}
@Override
public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
}
@Override
public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
}
@Override
public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
}
@Override
public void notifyPropagatedEntities() throws AtlasBaseException {
}
@Override
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graph.IFullTextMapper;
import java.util.List;
public class FullTextMapperV2Nop implements IFullTextMapper {
@Override
public String getIndexTextForClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
return null;
}
@Override
public String getIndexTextForEntity(String guid) throws AtlasBaseException {
return null;
}
@Override
public String getClassificationTextForEntity(AtlasEntity entity) throws AtlasBaseException {
return null;
}
@Override
public AtlasEntity getAndCacheEntity(String guid) throws AtlasBaseException {
return null;
}
@Override
public AtlasEntity getAndCacheEntity(String guid, boolean includeReferences) throws AtlasBaseException {
return null;
}
@Override
public AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntityWithExtInfo(String guid) throws AtlasBaseException {
return null;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
public abstract class ImportStrategy {
public abstract EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.converters.AtlasFormatConverters;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder;
import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MigrationImport extends ImportStrategy {
private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
private final AtlasTypeRegistry typeRegistry;
private AtlasGraph atlasGraph;
private EntityGraphRetriever entityGraphRetriever;
private EntityGraphMapper entityGraphMapper;
private AtlasEntityStore entityStore;
public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
setupEntityStore(atlasGraphProvider, typeRegistry);
LOG.info("MigrationImport: Using bulkLoading...");
}
public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
if (importResult.getRequest() == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
}
int index = 0;
int streamSize = entityStream.size();
EntityMutationResponse ret = new EntityMutationResponse();
EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult);
try {
LOG.info("Migration Import: Size: {}: Starting...", streamSize);
index = creationManager.read(entityStream);
creationManager.drain();
creationManager.extractResults();
} catch (Exception ex) {
LOG.error("Migration Import: Error: Current position: {}", index, ex);
} finally {
shutdownEntityCreationManager(creationManager);
}
LOG.info("Migration Import: Size: {}: Done!", streamSize);
return ret;
}
private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult) {
int batchSize = importResult.getRequest().getOptionKeyBatchSize();
int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
EntityConsumerBuilder consumerBuilder =
new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult);
}
private static int getNumWorkers(int numWorkersFromOptions) {
int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1;
LOG.info("Migration Import: Setting numWorkers: {}", ret);
return ret;
}
private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.atlasGraph = atlasGraphProvider.getBulkLoading();
DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry);
IAtlasEntityChangeNotifier entityChangeNotifier = new EntityChangeNotifierNop();
AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, entityChangeNotifier);
AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters);
this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, entityChangeNotifier, instanceConverter, new FullTextMapperV2Nop());
this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper);
}
private void shutdownEntityCreationManager(EntityCreationManager creationManager) {
try {
creationManager.shutdown();
} catch (InterruptedException e) {
LOG.error("Migration Import: Shutdown: Interrupted!", e);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress;
public class RegularImport extends ImportStrategy {
private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class);
private static final int MAX_ATTEMPTS = 3;
private final AtlasEntityStore entityStore;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityGraphRetriever;
private boolean directoryBasedImportConfigured;
public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
}
@Override
public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> bulkImport()");
}
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<>());
Set<String> processedGuids = new HashSet<>();
float currentPercent = 0f;
List<String> residualList = new ArrayList<>();
EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
while (entityImportStreamWithResidualList.hasNext()) {
AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null) {
continue;
}
for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
try {
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
if (resp.getGuidAssignments() != null) {
ret.getGuidAssignments().putAll(resp.getGuidAssignments());
}
currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
entityStream.getPosition(),
entityImportStreamWithResidualList.getStreamSize(),
currentPercent);
entityStream.onImportComplete(entity.getGuid());
break;
} catch (AtlasBaseException e) {
if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw e;
}
break;
} catch (AtlasSchemaViolationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity: {}", entity.getGuid(), e);
}
if (attempt == 0) {
updateVertexGuid(entity);
} else {
LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid());
throw e;
}
} catch (Throwable e) {
AtlasBaseException abe = new AtlasBaseException(e);
if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
throw abe;
}
LOG.warn("Exception: {}", entity.getGuid(), e);
break;
} finally {
RequestContext.get().clearCache();
}
}
}
importResult.getProcessedEntities().addAll(processedGuids);
LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
return ret;
}
@GraphTransaction
public void updateVertexGuid(AtlasEntity entity) {
String entityGuid = entity.getGuid();
AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
String vertexGuid = null;
try {
vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
} catch (AtlasBaseException e) {
LOG.warn("Entity: {}: Does not exist!", objectId);
return;
}
if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
return;
}
AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
if (v == null) {
return;
}
addHistoricalGuid(v, vertexGuid);
AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
}
private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
}
@VisibleForTesting
static String getJsonArray(String json, String vertexGuid) {
String quotedGuid = String.format("\"%s\"", vertexGuid);
if (StringUtils.isEmpty(json)) {
json = String.format("[%s]", quotedGuid);
} else {
json = json.replace("]", "").concat(",").concat(quotedGuid).concat("]");
}
return json;
}
private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
return false;
}
lineageList.add(guid);
return true;
}
private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
EntityMutationResponse resp,
AtlasImportResult importResult,
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
if (!directoryBasedImportConfigured) {
BulkImporterImpl.updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
BulkImporterImpl.updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
BulkImporterImpl.updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
}
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
}
private static class EntityImportStreamWithResidualList {
private final EntityImportStream stream;
private final List<String> residualList;
private boolean navigateResidualList;
private int currentResidualListIndex;
public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
this.stream = stream;
this.residualList = residualList;
this.navigateResidualList = false;
this.currentResidualListIndex = 0;
}
public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
if (navigateResidualList == false) {
return stream.getNextEntityWithExtInfo();
} else {
stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
return stream.getNextEntityWithExtInfo();
}
}
public boolean hasNext() {
if (!navigateResidualList) {
boolean streamHasNext = stream.hasNext();
navigateResidualList = (streamHasNext == false);
return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
} else {
return (currentResidualListIndex < residualList.size());
}
}
public int getStreamSize() {
return stream.size() + residualList.size();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> {
private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class);
private static final int MAX_COMMIT_RETRY_COUNT = 3;
private final int batchSize;
private AtomicLong counter = new AtomicLong(1);
private AtomicLong currentBatch = new AtomicLong(1);
private final AtlasGraph atlasGraph;
private final AtlasEntityStore entityStoreV2;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityGraphRetriever;
private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>();
private List<EntityMutationResponse> localResults = new ArrayList<>();
public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry,
BlockingQueue queue, int batchSize) {
super(queue);
this.atlasGraph = atlasGraph;
this.entityStoreV2 = entityStore;
this.entityGraphRetriever = entityGraphRetriever;
this.typeRegistry = typeRegistry;
this.batchSize = batchSize;
}
@Override
protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities())
? 1
: entityWithExtInfo.getReferredEntities().size()) + 1;
long currentCount = counter.addAndGet(delta);
currentBatch.addAndGet(delta);
entityBuffer.add(entityWithExtInfo);
try {
processEntity(entityWithExtInfo, currentCount);
attemptCommit();
} catch (Exception e) {
LOG.info("Data loss: Please re-submit!", e);
}
}
private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
try {
RequestContext.get().setImportInProgress(true);
AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
LOG.debug("Processing: {}", currentCount);
EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream);
localResults.add(result);
} catch (AtlasBaseException e) {
addResult(entityWithExtInfo.getEntity().getGuid());
LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e);
} catch (AtlasSchemaViolationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e);
}
BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity());
}
}
private void attemptCommit() {
if (currentBatch.get() < batchSize) {
return;
}
doCommit();
}
@Override
protected void doCommit() {
for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
if (commitWithRetry(retryCount)) {
return;
}
}
LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get());
clear();
}
@Override
protected void commitDirty() {
super.commitDirty();
LOG.info("Total: Commit: {}", counter.get());
counter.set(0);
}
private boolean commitWithRetry(int retryCount) {
try {
atlasGraph.commit();
if (LOG.isDebugEnabled()) {
LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get());
}
dispatchResults();
return true;
} catch (Exception ex) {
rollbackPauseRetry(retryCount, ex);
return false;
}
}
private void rollbackPauseRetry(int retryCount, Exception ex) {
atlasGraph.rollback();
clearCache();
LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
pause(retryCount);
LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
retryProcessEntity(retryCount);
}
private void retryProcessEntity(int retryCount) {
LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
processEntity(e, counter.get());
}
LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
}
private void dispatchResults() {
localResults.stream().forEach(x -> {
addResultsFromResponse(x.getCreatedEntities());
addResultsFromResponse(x.getUpdatedEntities());
addResultsFromResponse(x.getDeletedEntities());
});
clear();
}
private void pause(int retryCount) {
try {
Thread.sleep(1000 * retryCount);
} catch (InterruptedException e) {
LOG.error("pause: Interrupted!", e);
}
}
private void addResultsFromResponse(List<AtlasEntityHeader> entities) {
if (CollectionUtils.isEmpty(entities)) {
return;
}
for (AtlasEntityHeader eh : entities) {
addResult(eh.getGuid());
}
}
private void clear() {
localResults.clear();
entityBuffer.clear();
clearCache();
currentBatch.set(0);
}
private void clearCache() {
GraphTransactionInterceptor.clearCache();
RequestContext.get().clearCache();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasTypeRegistry;
import java.util.concurrent.BlockingQueue;
public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> {
private AtlasGraph atlasGraph;
private AtlasEntityStore entityStore;
private final EntityGraphRetriever entityGraphRetriever;
private final AtlasTypeRegistry typeRegistry;
private int batchSize;
public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) {
this.atlasGraph = atlasGraph;
this.entityStore = entityStore;
this.entityGraphRetriever = entityGraphRetriever;
this.typeRegistry = typeRegistry;
this.batchSize = batchSize;
}
@Override
public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.StatusReporter;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
private static final String WORKER_PREFIX = "migration-import";
private final StatusReporter<String, String> statusReporter;
private final AtlasImportResult importResult;
private String currentTypeName;
private float currentPercent;
private EntityImportStream entityImportStream;
public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult) {
super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
this.importResult = importResult;
this.statusReporter = new StatusReporter<>();
}
public int read(EntityImportStream entityStream) {
int currentIndex = 0;
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
this.entityImportStream = entityStream;
while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null) {
continue;
}
try {
produce(currentIndex++, entity.getTypeName(), entityWithExtInfo);
} catch (Throwable e) {
LOG.warn("Exception: {}", entity.getGuid(), e);
break;
}
}
return currentIndex;
}
private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
String previousTypeName = getCurrentTypeName();
if (StringUtils.isNotEmpty(typeName)
&& StringUtils.isNotEmpty(previousTypeName)
&& !StringUtils.equals(previousTypeName, typeName)) {
LOG.info("Waiting: '{}' to complete...", previousTypeName);
super.drain();
LOG.info("Switching entity type processing: From: '{}' To: '{}'...", previousTypeName, typeName);
}
setCurrentTypeName(typeName);
statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex));
super.checkProduce(entityWithExtInfo);
extractResults();
}
public void extractResults() {
Object result;
while (((result = getResults().poll())) != null) {
statusReporter.processed((String) result);
}
logStatus();
}
private void logStatus() {
String ack = statusReporter.ack();
if (StringUtils.isEmpty(ack)) {
return;
}
String[] split = ack.split(":");
if (split.length == 0 || split.length < 2) {
return;
}
importResult.incrementMeticsCounter(split[0]);
this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), this.entityImportStream.size(), getCurrentPercent());
}
private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
}
private String getCurrentTypeName() {
return this.currentTypeName;
}
private void setCurrentTypeName(String typeName) {
this.currentTypeName = typeName;
}
private float getCurrentPercent() {
return this.currentPercent;
}
}
...@@ -36,7 +36,9 @@ import org.apache.atlas.listener.TypeDefChangeListener; ...@@ -36,7 +36,9 @@ import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.repository.audit.EntityAuditListener; import org.apache.atlas.repository.audit.EntityAuditListener;
import org.apache.atlas.repository.audit.EntityAuditListenerV2; import org.apache.atlas.repository.audit.EntityAuditListenerV2;
import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.graph.FullTextMapperV2;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.IFullTextMapper;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator; import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.repository.graphdb.janus.migration.GraphDBGraphSONMigrator; import org.apache.atlas.repository.graphdb.janus.migration.GraphDBGraphSONMigrator;
...@@ -61,6 +63,7 @@ import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2; ...@@ -61,6 +63,7 @@ import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2; import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl; import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
import org.apache.atlas.runner.LocalSolrRunner; import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
...@@ -144,6 +147,8 @@ public class TestModules { ...@@ -144,6 +147,8 @@ public class TestModules {
bind(AtlasEntityStore.class).to(AtlasEntityStoreV2.class); bind(AtlasEntityStore.class).to(AtlasEntityStoreV2.class);
bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV2.class); bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV2.class);
bind(IAtlasEntityChangeNotifier.class).to(AtlasEntityChangeNotifier.class);
bind(IFullTextMapper.class).to(FullTextMapperV2.class);
// bind the DiscoveryService interface to an implementation // bind the DiscoveryService interface to an implementation
bind(AtlasDiscoveryService.class).to(EntityDiscoveryService.class).asEagerSingleton(); bind(AtlasDiscoveryService.class).to(EntityDiscoveryService.class).asEagerSingleton();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment