Commit 7b14cfac by Ashutosh Mestry

ATLAS-2589: HA typestore and graph indexer fix.

parent c76afbe2
......@@ -87,12 +87,13 @@ public final class AtlasGraphSONReader {
switch (fieldName) {
case GraphSONTokensTP2.MODE:
parser.nextToken();
final String mode = parser.getText();
if (!mode.equals("EXTENDED")) {
throw new IllegalStateException("The legacy GraphSON must be generated with GraphSONMode.EXTENDED");
}
counter.getAndIncrement();
break;
case GraphSONTokensTP2.VERTICES:
......@@ -136,10 +137,15 @@ public final class AtlasGraphSONReader {
}
private void processElement(JsonParser parser, ParseElement parseElement, long startIndex) throws InterruptedException {
LOG.info("processElement: {}: Starting... : counter at: {}", parseElement.getMessage(), counter.get());
try {
readerStatusManager.update(graph, counter.get(), true);
parseElement.setContext(graphSONUtility);
WorkItemManager wim = JsonNodeProcessManager.create(graph, bulkLoadGraph, parseElement, numWorkers, batchSize, startIndex);
WorkItemManager wim = JsonNodeProcessManager.create(graph, bulkLoadGraph, parseElement,
numWorkers, batchSize, shouldSkip(startIndex, counter.get()));
parser.nextToken();
......@@ -165,12 +171,12 @@ public final class AtlasGraphSONReader {
} finally {
LOG.info("processElement: {}: Done! : [{}]", parseElement.getMessage(), counter);
readerStatusManager.update(bulkLoadGraph, counter.get());
readerStatusManager.update(bulkLoadGraph, counter.get(), true);
}
}
private void postProcess(long startIndex) {
LOG.info("postProcess: Starting...");
LOG.info("postProcess: Starting... : counter at: {}", counter.get());
try {
PostProcessManager.WorkItemsManager wim = PostProcessManager.create(bulkLoadGraph, graphSONUtility,
......@@ -186,9 +192,9 @@ public final class AtlasGraphSONReader {
Vertex v = (Vertex) query.next();
updateStatusConditionally(bulkLoadGraph, counter.get());
wim.produce(v.id());
updateStatusConditionally(bulkLoadGraph, counter.get());
}
wim.shutdown();
......@@ -197,7 +203,7 @@ public final class AtlasGraphSONReader {
} finally {
LOG.info("postProcess: Done! : [{}]", counter.get());
readerStatusManager.update(bulkLoadGraph, counter.get());
readerStatusManager.update(bulkLoadGraph, counter.get(), true);
}
}
......@@ -210,14 +216,14 @@ public final class AtlasGraphSONReader {
return;
}
readerStatusManager.update(graph, counter);
readerStatusManager.update(graph, counter, false);
LOG.error("Thread interrupted: {}", counter);
throw new InterruptedException();
}
private void updateStatusConditionally(Graph graph, long counter) {
if(counter % batchSize == 0) {
readerStatusManager.update(graph, counter);
readerStatusManager.update(graph, counter, false);
}
}
......
......@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.NoSuchElementException;
import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
......@@ -103,10 +104,16 @@ public class JsonNodeParsers {
Element getByOriginalId(Graph gr, Object id) {
try {
return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, id).next();
} catch (Exception ex) {
LOG.error("fetchEdge: fetchFromDB failed: {}", id);
return null;
} catch (NoSuchElementException ex) {
if(LOG.isDebugEnabled()) {
LOG.debug("getByOriginalId: {}: failed: {}", getMessage(), id, ex);
}
}
catch (Exception ex) {
LOG.error("getByOriginalId: {}: failed: {}", getMessage(), id, ex);
}
return null;
}
@Override
......@@ -148,10 +155,15 @@ public class JsonNodeParsers {
Element getByOriginalId(Graph gr, Object id) {
try {
return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, id).next();
} catch (NoSuchElementException ex) {
if(LOG.isDebugEnabled()) {
LOG.debug("getByOriginalId: {}: failed: {}", getMessage(), id, ex);
}
} catch (Exception ex) {
LOG.error("getByOriginalId failed: {}", id);
return null;
LOG.error("getByOriginalId: {}: failed: {}", getMessage(), id, ex);
}
return null;
}
@Override
......
......@@ -187,23 +187,21 @@ public class JsonNodeProcessManager {
private final Graph bulkLoadGraph;
private final ParseElement parseElement;
private final int batchSize;
private final long startIndex;
private final boolean isResuming;
public ConsumerBuilder(Graph graph, Graph bulkLoadGraph, ParseElement parseElement, int batchSize, long startIndex) {
public ConsumerBuilder(Graph graph, Graph bulkLoadGraph, ParseElement parseElement, int batchSize, boolean isResuming) {
this.graph = graph;
this.bulkLoadGraph = bulkLoadGraph;
this.batchSize = batchSize;
this.parseElement = parseElement;
this.startIndex = startIndex;
this.isResuming = isResuming;
}
@Override
public Consumer build(BlockingQueue<JsonNode> queue) {
if(startIndex == 0) {
return new Consumer(queue, graph, bulkLoadGraph, parseElement, batchSize);
}
return new ResumingConsumer(queue, graph, bulkLoadGraph, parseElement, batchSize);
return (isResuming)
? new ResumingConsumer(queue, graph, bulkLoadGraph, parseElement, batchSize)
: new Consumer(queue, graph, bulkLoadGraph, parseElement, batchSize);
}
}
......@@ -214,8 +212,8 @@ public class JsonNodeProcessManager {
}
public static WorkItemManager create(Graph rGraph, Graph bGraph,
ParseElement parseElement, int numWorkers, int batchSize, long startIndex) {
ConsumerBuilder cb = new ConsumerBuilder(rGraph, bGraph, parseElement, batchSize, startIndex);
ParseElement parseElement, int numWorkers, int batchSize, boolean isResuming) {
ConsumerBuilder cb = new ConsumerBuilder(rGraph, bGraph, parseElement, batchSize, isResuming);
return new WorkItemManager(cb, batchSize, numWorkers);
}
......
......@@ -35,13 +35,14 @@ public class ReaderStatusManager {
private static final String MIGRATION_STATUS_TYPE_NAME = "__MigrationStatus";
private static final String CURRENT_INDEX_PROPERTY = "currentIndex";
private static final String CURRENT_COUNTER_PROPERTY = "currentCounter";
private static final String OPERATION_STATUS_PROPERTY = "operationStatus";
private static final String START_TIME_PROPERTY = "startTime";
private static final String END_TIME_PROPERTY = "endTime";
private static final String TOTAL_COUNT_PROPERTY = "totalCount";
public static final String STATUS_NOT_STARTED = "NOT STARTED";
public static final String STATUS_IN_PROGRESS = "IN PROGRESS";
public static final String STATUS_NOT_STARTED = "NOT_STARTED";
public static final String STATUS_IN_PROGRESS = "IN_PROGRESS";
public static final String STATUS_SUCCESS = "SUCCESS";
public static final String STATUS_FAILED = "FAILED";
......@@ -72,8 +73,12 @@ public class ReaderStatusManager {
update(bGraph, counter, status);
}
public void update(Graph graph, Long counter) {
migrationStatus.property(CURRENT_INDEX_PROPERTY, counter);
public void update(Graph graph, Long counter, boolean stageEnd) {
migrationStatus.property(CURRENT_COUNTER_PROPERTY, counter);
if(stageEnd) {
migrationStatus.property(CURRENT_INDEX_PROPERTY, counter);
}
if(graph.features().graph().supportsTransactions()) {
graph.tx().commit();
......@@ -82,7 +87,7 @@ public class ReaderStatusManager {
public void update(Graph graph, Long counter, String status) {
migrationStatus.property(OPERATION_STATUS_PROPERTY, status);
update(graph, counter);
update(graph, counter, true);
}
public void clear() {
......@@ -107,6 +112,7 @@ public class ReaderStatusManager {
long longValue = 0L;
v.property(Constants.ENTITY_TYPE_PROPERTY_KEY, MIGRATION_STATUS_TYPE_NAME);
v.property(CURRENT_COUNTER_PROPERTY, longValue);
v.property(CURRENT_INDEX_PROPERTY, longValue);
v.property(TOTAL_COUNT_PROPERTY, longValue);
v.property(OPERATION_STATUS_PROPERTY, STATUS_NOT_STARTED);
......@@ -122,35 +128,25 @@ public class ReaderStatusManager {
LOG.info("migrationStatus vertex created! v[{}]", migrationStatusId);
}
public static MigrationStatus updateFromVertex(Graph graph, MigrationStatus ms) {
Vertex vertex = fetchUsingTypeName(graph.traversal());
if(ms == null) {
ms = new MigrationStatus();
}
ms.setStartTime((Date) vertex.property(START_TIME_PROPERTY).value());
ms.setEndTime((Date) vertex.property(END_TIME_PROPERTY).value());
ms.setCurrentIndex((Long) vertex.property(CURRENT_INDEX_PROPERTY).value());
ms.setOperationStatus((String) vertex.property(OPERATION_STATUS_PROPERTY).value());
ms.setTotalCount((Long) vertex.property(TOTAL_COUNT_PROPERTY).value());
return ms;
}
public static MigrationStatus get(Graph graph) {
MigrationStatus ms = new MigrationStatus();
try {
Vertex v = fetchUsingTypeName(graph.traversal());
ms.setStartTime((Date) v.property(START_TIME_PROPERTY).value());
ms.setEndTime((Date) v.property(END_TIME_PROPERTY).value());
ms.setCurrentIndex((long) v.property(CURRENT_INDEX_PROPERTY).value());
ms.setOperationStatus((String) v.property(OPERATION_STATUS_PROPERTY).value());
ms.setTotalCount((long) v.property(TOTAL_COUNT_PROPERTY).value());
setValues(ms, fetchUsingTypeName(graph.traversal()));
} catch (Exception ex) {
LOG.error("get: failed!", ex);
if(LOG.isDebugEnabled()) {
LOG.error("get: failed!", ex);
}
}
return ms;
}
private static void setValues(MigrationStatus ms, Vertex vertex) {
ms.setStartTime((Date) vertex.property(START_TIME_PROPERTY).value());
ms.setEndTime((Date) vertex.property(END_TIME_PROPERTY).value());
ms.setCurrentIndex((Long) vertex.property(CURRENT_INDEX_PROPERTY).value());
ms.setCurrentCounter((Long) vertex.property(CURRENT_COUNTER_PROPERTY).value());
ms.setOperationStatus((String) vertex.property(OPERATION_STATUS_PROPERTY).value());
ms.setTotalCount((Long) vertex.property(TOTAL_COUNT_PROPERTY).value());
}
}
......@@ -34,10 +34,10 @@ public class ReaderStatusManagerTest {
assertNotNull(tg.traversal().V(sm.migrationStatusId).next());
MigrationStatus ms = ReaderStatusManager.updateFromVertex(tg, null);
MigrationStatus ms = ReaderStatusManager.get(tg);
assertEquals(ms.getCurrentIndex(), 0L);
assertEquals(ms.getTotalCount(), 0L);
assertEquals(ms.getOperationStatus(), "NOT STARTED");
assertEquals(ms.getOperationStatus(), ReaderStatusManager.STATUS_NOT_STARTED);
assertNotNull(ms.getStartTime());
assertNotNull(ms.getEndTime());
}
......@@ -45,15 +45,15 @@ public class ReaderStatusManagerTest {
@Test
public void verifyUpdates() {
long expectedTotalCount = 1001L;
String expectedOperationStatus = "SUCCESS";
String expectedOperationStatus = ReaderStatusManager.STATUS_SUCCESS;
TinkerGraph tg = TinkerGraph.open();
ReaderStatusManager sm = new ReaderStatusManager(tg, tg);
sm.update(tg, 1000L, "IN PROGRESS");
sm.update(tg, 1000L, ReaderStatusManager.STATUS_IN_PROGRESS);
sm.end(tg, expectedTotalCount, expectedOperationStatus);
MigrationStatus ms = ReaderStatusManager.updateFromVertex(tg, null);
MigrationStatus ms = ReaderStatusManager.get(tg);
assertEquals(ms.getCurrentIndex(), expectedTotalCount);
assertEquals(ms.getTotalCount(), expectedTotalCount);
assertEquals(ms.getOperationStatus(), expectedOperationStatus);
......
......@@ -38,6 +38,7 @@ public class MigrationStatus implements Serializable {
private Date startTime;
private Date endTime;
private long currentIndex;
private long currentCounter;
private long totalCount;
public void setOperationStatus(String operationStatus) {
......@@ -80,11 +81,18 @@ public class MigrationStatus implements Serializable {
return this.totalCount;
}
public StringBuilder toString(StringBuilder sb) {
public void setCurrentCounter(long value) {
this.currentCounter = value;
}
public Long getCurrentCounter() { return this.currentCounter; }
public StringBuilder toString(StringBuilder sb) {
sb.append(", operationStatus=").append(operationStatus);
sb.append(", startTime=").append(startTime);
sb.append(", endTime=").append(endTime);
sb.append(", currentIndex=").append(currentIndex);
sb.append(", currentCounter=").append(currentCounter);
sb.append(", totalCount=").append(totalCount);
return sb;
......
......@@ -32,6 +32,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.service.Service;
import org.apache.commons.io.FileUtils;
import org.apache.solr.common.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
......@@ -100,6 +101,10 @@ public class DataMigrationService implements Service {
public void performImport() throws AtlasBaseException {
try {
if(!performAccessChecks(importDirectory)) {
return;
}
performInit();
FileInputStream fs = new FileInputStream(getFileFromImportDirectory(importDirectory, ATLAS_MIGRATION_DATA_NAME));
......@@ -111,10 +116,28 @@ public class DataMigrationService implements Service {
}
}
private boolean performAccessChecks(String path) {
boolean ret = false;
if(StringUtils.isEmpty(path)) {
ret = false;
} else {
File f = new File(path);
ret = f.exists() && f.isDirectory() && f.canRead();
}
if (ret) {
LOG.info("will migrate data in directory {}", importDirectory);
} else {
LOG.error("cannot read migration data in directory {}", importDirectory);
}
return ret;
}
private void performInit() throws AtlasBaseException, AtlasException {
storeInitializer.init();
processIncomingTypesDef(getFileFromImportDirectory(importDirectory, ATLAS_MIGRATION_TYPESDEF_NAME));
indexer.instanceIsActive();
storeInitializer.instanceIsActive();
processIncomingTypesDef(getFileFromImportDirectory(importDirectory, ATLAS_MIGRATION_TYPESDEF_NAME));
}
@VisibleForTesting
......
......@@ -64,8 +64,6 @@ import java.util.Map;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
/**
* Class that handles initial loading of models and patches into typedef store
......@@ -90,9 +88,8 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
@PostConstruct
public void init() throws AtlasBaseException {
LOG.info("==> AtlasTypeDefStoreInitializer.init()");
boolean isMigrationEnabled = !StringUtils.isEmpty(conf.getString(ATLAS_MIGRATION_MODE_FILENAME));
if (!HAConfiguration.isHAEnabled(conf) || isMigrationEnabled) {
if (!HAConfiguration.isHAEnabled(conf)) {
atlasTypeDefStore.init();
loadBootstrapTypeDefs();
......
......@@ -43,7 +43,7 @@ public class HiveParititionTest extends MigrationBaseAsserts {
@Test
public void fileImporterTest() throws IOException, AtlasBaseException {
final int EXPECTED_TOTAL_COUNT = 140;
final int EXPECTED_TOTAL_COUNT = 141;
final int EXPECTED_DB_COUNT = 1;
final int EXPECTED_TABLE_COUNT = 2;
final int EXPECTED_COLUMN_COUNT = 7;
......
......@@ -37,7 +37,7 @@ public class HiveStocksTest extends MigrationBaseAsserts {
@Test
public void migrateStocks() throws AtlasBaseException, IOException {
final int EXPECTED_TOTAL_COUNT = 187;
final int EXPECTED_TOTAL_COUNT = 188;
final int EXPECTED_DB_COUNT = 1;
final int EXPECTED_TABLE_COUNT = 1;
final int EXPECTED_COLUMN_COUNT = 7;
......
......@@ -127,7 +127,7 @@ public class MigrationProgressServiceTest {
}
ReaderStatusManager rsm = new ReaderStatusManager(tg, tg);
rsm.update(tg, currentIndex);
rsm.update(tg, currentIndex, false);
rsm.end(tg, totalIndex, status);
return tg;
}
......
......@@ -44,11 +44,13 @@ public class PathTest extends MigrationBaseAsserts {
@Test
public void migrationImport() throws IOException, AtlasBaseException {
final int EXPECTED_TOTAL_COUNT = 89;
runFileImporter("path_db");
AtlasVertex v = assertHdfsPathVertices(1);
assertVertexProperties(v);
assertMigrationStatus(88);
assertMigrationStatus(EXPECTED_TOTAL_COUNT);
}
private void assertVertexProperties(AtlasVertex v) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment