Commit 15967a93 by Ashutosh Mestry

ATLAS-2555: Migration-Import: Support for BigInteger, BigDecimal. Unit tests.

parent 7515915f
...@@ -28,6 +28,7 @@ import javax.script.ScriptException; ...@@ -28,6 +28,7 @@ import javax.script.ScriptException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.groovy.GroovyExpression; import org.apache.atlas.groovy.GroovyExpression;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
/** /**
...@@ -320,4 +321,6 @@ public interface AtlasGraph<V, E> { ...@@ -320,4 +321,6 @@ public interface AtlasGraph<V, E> {
boolean isMultiProperty(String name); boolean isMultiProperty(String name);
void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException; void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException;
MigrationStatus getMigrationStatus();
} }
...@@ -25,6 +25,7 @@ import org.apache.atlas.AtlasErrorCode; ...@@ -25,6 +25,7 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.groovy.GroovyExpression; import org.apache.atlas.groovy.GroovyExpression;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
...@@ -33,6 +34,7 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery; ...@@ -33,6 +34,7 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.GremlinVersion; import org.apache.atlas.repository.graphdb.GremlinVersion;
import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager;
import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery; import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery;
import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter; import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
...@@ -459,4 +461,9 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE ...@@ -459,4 +461,9 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
public void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException { public void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException {
AtlasJanusGraphDatabase.loadLegacyGraphSON(relationshipCache, fs); AtlasJanusGraphDatabase.loadLegacyGraphSON(relationshipCache, fs);
} }
@Override
public MigrationStatus getMigrationStatus() {
return AtlasJanusGraphDatabase.getMigrationStatus();
}
} }
...@@ -21,9 +21,11 @@ package org.apache.atlas.repository.graphdb.janus; ...@@ -21,9 +21,11 @@ package org.apache.atlas.repository.graphdb.janus;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDatabase; import org.apache.atlas.repository.graphdb.GraphDatabase;
import org.apache.atlas.repository.graphdb.janus.migration.AtlasGraphSONReader; import org.apache.atlas.repository.graphdb.janus.migration.AtlasGraphSONReader;
import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager;
import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer; import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer; import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer; import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
...@@ -259,4 +261,8 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, ...@@ -259,4 +261,8 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
LOG.info("Done! loadLegacyGraphSON."); LOG.info("Done! loadLegacyGraphSON.");
} }
} }
public static MigrationStatus getMigrationStatus() {
return ReaderStatusManager.get(getGraphInstance());
}
} }
...@@ -103,6 +103,16 @@ public final class AtlasGraphSONReader { ...@@ -103,6 +103,16 @@ public final class AtlasGraphSONReader {
processElement(parser, new JsonNodeParsers.ParseEdge(), startIndex); processElement(parser, new JsonNodeParsers.ParseEdge(), startIndex);
break; break;
case GraphSONTokensTP2.VERTEX_COUNT:
parser.nextToken();
LOG.info("Vertex count: {}", parser.getLongValue());
break;
case GraphSONTokensTP2.EDGE_COUNT:
parser.nextToken();
LOG.info("Edge count: {}", parser.getLongValue());
break;
default: default:
throw new IllegalStateException(String.format("Unexpected token in GraphSON - %s", fieldName)); throw new IllegalStateException(String.format("Unexpected token in GraphSON - %s", fieldName));
} }
......
...@@ -19,27 +19,32 @@ ...@@ -19,27 +19,32 @@
package org.apache.atlas.repository.graphdb.janus.migration; package org.apache.atlas.repository.graphdb.janus.migration;
public final class GraphSONTokensTP2 { public final class GraphSONTokensTP2 {
public static final String _ID = "_id"; public static final String _ID = "_id";
public static final String _LABEL = "_label"; public static final String _LABEL = "_label";
public static final String _TYPE = "_type"; public static final String _TYPE = "_type";
public static final String _OUT_V = "_outV"; public static final String _OUT_V = "_outV";
public static final String _IN_V = "_inV"; public static final String _IN_V = "_inV";
public static final String VALUE = "value"; public static final String VALUE = "value";
public static final String TYPE = "type"; public static final String TYPE = "type";
public static final String TYPE_LIST = "list"; public static final String TYPE_LIST = "list";
public static final String TYPE_STRING = "string"; public static final String TYPE_STRING = "string";
public static final String TYPE_DOUBLE = "double"; public static final String TYPE_DOUBLE = "double";
public static final String TYPE_INTEGER = "integer"; public static final String TYPE_INTEGER = "integer";
public static final String TYPE_FLOAT = "float"; public static final String TYPE_FLOAT = "float";
public static final String TYPE_MAP = "map"; public static final String TYPE_MAP = "map";
public static final String TYPE_BOOLEAN = "boolean"; public static final String TYPE_BOOLEAN = "boolean";
public static final String TYPE_LONG = "long"; public static final String TYPE_LONG = "long";
public static final String TYPE_SHORT = "short"; public static final String TYPE_SHORT = "short";
public static final String TYPE_BYTE = "byte"; public static final String TYPE_BYTE = "byte";
public static final String TYPE_UNKNOWN = "unknown"; public static final String TYPE_BIG_DECIMAL = "bigdecimal";
public static final String VERTICES = "vertices"; public static final String TYPE_BIG_INTEGER = "biginteger";
public static final String EDGES = "edges"; public static final String TYPE_DATE = "date";
public static final String MODE = "mode"; public static final String TYPE_UNKNOWN = "unknown";
public static final String VERTICES = "vertices";
public static final String EDGES = "edges";
public static final String MODE = "mode";
public static final String VERTEX_COUNT = "vertexCount";
public static final String EDGE_COUNT = "edgeCount";
private GraphSONTokensTP2() { private GraphSONTokensTP2() {
} }
......
...@@ -20,6 +20,7 @@ package org.apache.atlas.repository.graphdb.janus.migration; ...@@ -20,6 +20,7 @@ package org.apache.atlas.repository.graphdb.janus.migration;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasBuiltInTypes;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Graph;
...@@ -43,6 +44,8 @@ class GraphSONUtility { ...@@ -43,6 +44,8 @@ class GraphSONUtility {
private static final String EMPTY_STRING = ""; private static final String EMPTY_STRING = "";
private final RelationshipTypeCache relationshipTypeCache; private final RelationshipTypeCache relationshipTypeCache;
private static AtlasBuiltInTypes.AtlasBigIntegerType bigIntegerType = new AtlasBuiltInTypes.AtlasBigIntegerType();
private static AtlasBuiltInTypes.AtlasBigDecimalType bigDecimalType = new AtlasBuiltInTypes.AtlasBigDecimalType();
public GraphSONUtility(final RelationshipTypeCache relationshipTypeCache) { public GraphSONUtility(final RelationshipTypeCache relationshipTypeCache) {
this.relationshipTypeCache = relationshipTypeCache; this.relationshipTypeCache = relationshipTypeCache;
...@@ -187,7 +190,9 @@ class GraphSONUtility { ...@@ -187,7 +190,9 @@ class GraphSONUtility {
if (StringUtils.isNotEmpty(typeName)) { if (StringUtils.isNotEmpty(typeName)) {
props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName); props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
} else { } else {
LOG.debug("Could not find relationship type for: {}", edgeLabel); if (LOG.isDebugEnabled()) {
LOG.debug("Could not find relationship type for: {}", edgeLabel);
}
} }
} }
...@@ -223,6 +228,7 @@ class GraphSONUtility { ...@@ -223,6 +228,7 @@ class GraphSONUtility {
} }
} }
@VisibleForTesting
static Map<String, Object> readProperties(final JsonNode node) { static Map<String, Object> readProperties(final JsonNode node) {
final Map<String, Object> map = new HashMap<>(); final Map<String, Object> map = new HashMap<>();
final Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); final Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
...@@ -267,7 +273,13 @@ class GraphSONUtility { ...@@ -267,7 +273,13 @@ class GraphSONUtility {
} else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_INTEGER)) { } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_INTEGER)) {
propertyValue = node.get(GraphSONTokensTP2.VALUE).intValue(); propertyValue = node.get(GraphSONTokensTP2.VALUE).intValue();
} else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_LONG)) { } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_LONG)) {
propertyValue = node.get(GraphSONTokensTP2.VALUE).longValue(); propertyValue = node.get(GraphSONTokensTP2.VALUE).asLong();
} else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_BIG_DECIMAL)) {
propertyValue = bigDecimalType.getNormalizedValue(node.get(GraphSONTokensTP2.VALUE));
} else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_BIG_INTEGER)) {
propertyValue = bigIntegerType.getNormalizedValue(node.get(GraphSONTokensTP2.VALUE));
} else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_DATE)) {
propertyValue = new Date(node.get(GraphSONTokensTP2.VALUE).asLong());
} else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_STRING)) { } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_STRING)) {
propertyValue = node.get(GraphSONTokensTP2.VALUE).textValue(); propertyValue = node.get(GraphSONTokensTP2.VALUE).textValue();
} else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_LIST)) { } else if (node.get(GraphSONTokensTP2.TYPE).textValue().equals(GraphSONTokensTP2.TYPE_LIST)) {
...@@ -308,6 +320,10 @@ class GraphSONUtility { ...@@ -308,6 +320,10 @@ class GraphSONUtility {
theValue = node.longValue(); theValue = node.longValue();
} else if (node.isTextual()) { } else if (node.isTextual()) {
theValue = node.textValue(); theValue = node.textValue();
} else if (node.isBigDecimal()) {
theValue = node.decimalValue();
} else if (node.isBigInteger()) {
theValue = node.bigIntegerValue();
} else if (node.isArray()) { } else if (node.isArray()) {
// this is an array so just send it back so that it can be // this is an array so just send it back so that it can be
// reprocessed to its primitive components // reprocessed to its primitive components
......
...@@ -73,42 +73,10 @@ public class JsonNodeParsers { ...@@ -73,42 +73,10 @@ public class JsonNodeParsers {
return el; return el;
} }
static Object getTypedValueFromJsonNode(final JsonNode node) {
Object theValue = null;
if (node != null && !node.isNull()) {
if (node.isBoolean()) {
theValue = node.booleanValue();
} else if (node.isDouble()) {
theValue = node.doubleValue();
} else if (node.isFloatingPointNumber()) {
theValue = node.floatValue();
} else if (node.isInt()) {
theValue = node.intValue();
} else if (node.isLong()) {
theValue = node.longValue();
} else if (node.isTextual()) {
theValue = node.textValue();
} else if (node.isArray()) {
// this is an array so just send it back so that it can be
// reprocessed to its primitive components
theValue = node;
} else if (node.isObject()) {
// this is an object so just send it back so that it can be
// reprocessed to its primitive components
theValue = node;
} else {
theValue = node.textValue();
}
}
return theValue;
}
} }
static class ParseEdge extends ParseElement { static class ParseEdge extends ParseElement {
private static final String MESSAGE_EDGE = "edge"; private static final String MESSAGE_EDGE = "edge";
private static final String TYPE_NAME_NODE_NAME = Constants.VERTEX_TYPE_PROPERTY_KEY;
@Override @Override
...@@ -118,7 +86,7 @@ public class JsonNodeParsers { ...@@ -118,7 +86,7 @@ public class JsonNodeParsers {
@Override @Override
Object getId(JsonNode node) { Object getId(JsonNode node) {
return getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID)); return utility.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
} }
@Override @Override
...@@ -163,7 +131,7 @@ public class JsonNodeParsers { ...@@ -163,7 +131,7 @@ public class JsonNodeParsers {
@Override @Override
Object getId(JsonNode node) { Object getId(JsonNode node) {
return getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID)); return utility.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
} }
@Override @Override
......
...@@ -108,7 +108,7 @@ public class JsonNodeProcessManager { ...@@ -108,7 +108,7 @@ public class JsonNodeProcessManager {
display("commit-size: {}: Done!", size); display("commit-size: {}: Done!", size);
} }
private void updateSchema(Map<String, Object> schema, org.apache.tinkerpop.shaded.jackson.databind.JsonNode node) { private void updateSchema(Map<String, Object> schema, JsonNode node) {
synchronized (graph) { synchronized (graph) {
String typeName = parseElement.getType(node); String typeName = parseElement.getType(node);
...@@ -142,7 +142,7 @@ public class JsonNodeProcessManager { ...@@ -142,7 +142,7 @@ public class JsonNodeProcessManager {
try { try {
Thread.sleep(WAIT_DURATION_AFTER_COMMIT_EXCEPTION); Thread.sleep(WAIT_DURATION_AFTER_COMMIT_EXCEPTION);
for (org.apache.tinkerpop.shaded.jackson.databind.JsonNode n : nodes) { for (JsonNode n : nodes) {
parseElement.parse(bulkLoadGraph, cache, n); parseElement.parse(bulkLoadGraph, cache, n);
} }
commitBulk(); commitBulk();
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.repository.graphdb.janus.migration; package org.apache.atlas.repository.graphdb.janus.migration;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.utils.LruCache; import org.apache.atlas.utils.LruCache;
import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Graph;
...@@ -33,8 +34,11 @@ import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY; ...@@ -33,8 +34,11 @@ import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
public class MappedElementCache { public class MappedElementCache {
private static final Logger LOG = LoggerFactory.getLogger(MappedElementCache.class); private static final Logger LOG = LoggerFactory.getLogger(MappedElementCache.class);
private final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 100000); @VisibleForTesting
private final Map<String, String> lruEdgeCache = new LruCache<>(500, 100000); final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 100000);
@VisibleForTesting
final Map<String, String> lruEdgeCache = new LruCache<>(500, 100000);
public Vertex getMappedVertex(Graph gr, Object key) { public Vertex getMappedVertex(Graph gr, Object key) {
try { try {
...@@ -83,7 +87,8 @@ public class MappedElementCache { ...@@ -83,7 +87,8 @@ public class MappedElementCache {
} }
} }
private Vertex fetchVertex(Graph gr, Object key) { @VisibleForTesting
Vertex fetchVertex(Graph gr, Object key) {
try { try {
return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, key).next(); return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, key).next();
} catch (Exception ex) { } catch (Exception ex) {
...@@ -92,7 +97,8 @@ public class MappedElementCache { ...@@ -92,7 +97,8 @@ public class MappedElementCache {
} }
} }
private Edge fetchEdge(Graph gr, String key) { @VisibleForTesting
Edge fetchEdge(Graph gr, String key) {
try { try {
return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, key).next(); return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, key).next();
} catch (Exception ex) { } catch (Exception ex) {
...@@ -101,16 +107,8 @@ public class MappedElementCache { ...@@ -101,16 +107,8 @@ public class MappedElementCache {
} }
} }
public void clearVertexCache() { public void clearAll() {
lruVertexCache.clear(); lruVertexCache.clear();
}
public void clearEdgeCache() {
lruEdgeCache.clear(); lruEdgeCache.clear();
} }
public void clearAll() {
clearVertexCache();
clearEdgeCache();
}
} }
...@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; ...@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
public class PostProcessManager { public class PostProcessManager {
private static class Consumer extends WorkItemConsumer<Object> { static class Consumer extends WorkItemConsumer<Object> {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class); private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private final Graph bulkLoadGraph; private final Graph bulkLoadGraph;
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.apache.atlas.repository.graphdb.janus.migration; package org.apache.atlas.repository.graphdb.janus.migration;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
...@@ -43,7 +45,8 @@ public class ReaderStatusManager { ...@@ -43,7 +45,8 @@ public class ReaderStatusManager {
public static final String STATUS_SUCCESS = "SUCCESS"; public static final String STATUS_SUCCESS = "SUCCESS";
public static final String STATUS_FAILED = "FAILED"; public static final String STATUS_FAILED = "FAILED";
private Object migrationStatusId = null; @VisibleForTesting
Object migrationStatusId = null;
private Vertex migrationStatus = null; private Vertex migrationStatus = null;
public ReaderStatusManager(Graph graph, Graph bulkLoadGraph) { public ReaderStatusManager(Graph graph, Graph bulkLoadGraph) {
...@@ -71,7 +74,10 @@ public class ReaderStatusManager { ...@@ -71,7 +74,10 @@ public class ReaderStatusManager {
public void update(Graph graph, Long counter) { public void update(Graph graph, Long counter) {
migrationStatus.property(CURRENT_INDEX_PROPERTY, counter); migrationStatus.property(CURRENT_INDEX_PROPERTY, counter);
graph.tx().commit();
if(graph.features().graph().supportsTransactions()) {
graph.tx().commit();
}
} }
public void update(Graph graph, Long counter, String status) { public void update(Graph graph, Long counter, String status) {
...@@ -91,7 +97,7 @@ public class ReaderStatusManager { ...@@ -91,7 +97,7 @@ public class ReaderStatusManager {
return g.V(migrationStatusId).next(); return g.V(migrationStatusId).next();
} }
private Vertex fetchUsingTypeName(GraphTraversalSource g) { private static Vertex fetchUsingTypeName(GraphTraversalSource g) {
GraphTraversal src = g.V().has(Constants.ENTITY_TYPE_PROPERTY_KEY, MIGRATION_STATUS_TYPE_NAME); GraphTraversal src = g.V().has(Constants.ENTITY_TYPE_PROPERTY_KEY, MIGRATION_STATUS_TYPE_NAME);
return src.hasNext() ? (Vertex) src.next() : null; return src.hasNext() ? (Vertex) src.next() : null;
} }
...@@ -109,8 +115,42 @@ public class ReaderStatusManager { ...@@ -109,8 +115,42 @@ public class ReaderStatusManager {
migrationStatusId = v.id(); migrationStatusId = v.id();
rGraph.tx().commit(); if(rGraph.features().graph().supportsTransactions()) {
rGraph.tx().commit();
}
LOG.info("migrationStatus vertex created! v[{}]", migrationStatusId); LOG.info("migrationStatus vertex created! v[{}]", migrationStatusId);
} }
public static MigrationStatus updateFromVertex(Graph graph, MigrationStatus ms) {
Vertex vertex = fetchUsingTypeName(graph.traversal());
if(ms == null) {
ms = new MigrationStatus();
}
ms.setStartTime((Date) vertex.property(START_TIME_PROPERTY).value());
ms.setEndTime((Date) vertex.property(END_TIME_PROPERTY).value());
ms.setCurrentIndex((Long) vertex.property(CURRENT_INDEX_PROPERTY).value());
ms.setOperationStatus((String) vertex.property(OPERATION_STATUS_PROPERTY).value());
ms.setTotalCount((Long) vertex.property(TOTAL_COUNT_PROPERTY).value());
return ms;
}
public static MigrationStatus get(Graph graph) {
MigrationStatus ms = new MigrationStatus();
try {
Vertex v = fetchUsingTypeName(graph.traversal());
ms.setStartTime((Date) v.property(START_TIME_PROPERTY).value());
ms.setEndTime((Date) v.property(END_TIME_PROPERTY).value());
ms.setCurrentIndex((long) v.property(CURRENT_INDEX_PROPERTY).value());
ms.setOperationStatus((String) v.property(OPERATION_STATUS_PROPERTY).value());
ms.setTotalCount((long) v.property(TOTAL_COUNT_PROPERTY).value());
} catch (Exception ex) {
LOG.error("get: failed!", ex);
}
return ms;
}
} }
...@@ -18,14 +18,9 @@ ...@@ -18,14 +18,9 @@
package org.apache.atlas.repository.graphdb.janus.migration; package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
public class RelationshipTypeCache { public class RelationshipTypeCache {
......
...@@ -86,7 +86,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -86,7 +86,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
protected abstract void processItem(T item); protected abstract void processItem(T item);
private void updateCommitTime(long commitTime) { protected void updateCommitTime(long commitTime) {
if (this.maxCommitTimeSeconds < commitTime) { if (this.maxCommitTimeSeconds < commitTime) {
this.maxCommitTimeSeconds = commitTime; this.maxCommitTimeSeconds = commitTime;
} }
......
...@@ -55,7 +55,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { ...@@ -55,7 +55,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
public void shutdown() throws InterruptedException { public void shutdown() throws InterruptedException {
int avgCommitTimeSeconds = getAvgCommitTimeSeconds() * 2; int avgCommitTimeSeconds = getAvgCommitTimeSeconds() * 2;
LOG.info("WorkItemManager: Shutdown started. Will wait for: {} seconds...", avgCommitTimeSeconds); LOG.info("WorkItemManager: Shutdown started. Will wait for: {} minutes...", avgCommitTimeSeconds);
service.shutdown(); service.shutdown();
service.awaitTermination(avgCommitTimeSeconds, TimeUnit.MINUTES); service.awaitTermination(avgCommitTimeSeconds, TimeUnit.MINUTES);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.commons.io.FileUtils;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
import org.testng.ITestContext;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import static org.testng.AssertJUnit.assertTrue;
public class BaseUtils {
private static final String resourcesDirRelativePath = "/src/test/resources/";
private String resourceDir;
protected final RelationshipTypeCache emptyRelationshipCache = new RelationshipTypeCache(new HashMap<>());
protected GraphSONUtility graphSONUtility;
protected Object[][] getJsonNodeFromFile(String s) throws IOException {
File f = new File(getFilePath(s));
return new Object[][]{{getEntityNode(FileUtils.readFileToString(f))}};
}
protected String getFilePath(String fileName) {
return Paths.get(resourceDir, fileName).toString();
}
@BeforeClass
public void setup() {
resourceDir = System.getProperty("user.dir") + resourcesDirRelativePath;
graphSONUtility = new GraphSONUtility(emptyRelationshipCache);
}
protected Object getId(JsonNode node) {
GraphSONUtility gu = graphSONUtility;
return gu.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
}
private JsonNode getEntityNode(String json) throws IOException {
GraphSONMapper.Builder builder = GraphSONMapper.build();
final ObjectMapper mapper = builder.embedTypes(false).create().createMapper();
return mapper.readTree(json);
}
protected void addVertex(TinkerGraph tg, JsonNode node) {
GraphSONUtility utility = new GraphSONUtility(emptyRelationshipCache);
utility.vertexFromJson(tg, node);
}
protected void addEdge(TinkerGraph tg, MappedElementCache cache) throws IOException {
GraphSONUtility gu = graphSONUtility;
gu.vertexFromJson(tg, (JsonNode) (getDBV(null)[0][0]));
gu.vertexFromJson(tg, (JsonNode) (getTableV(null))[0][0]);
gu.edgeFromJson(tg, cache, (JsonNode) getEdge(null)[0][0]);
}
protected Vertex fetchTableVertex(TinkerGraph tg) {
GraphTraversal query = tg.traversal().V().has("__typeName", "hive_table");
assertTrue(query.hasNext());
return (Vertex) query.next();
}
@DataProvider(name = "col1")
public Object[][] getCol1(ITestContext context) throws IOException {
return getJsonNodeFromFile("col-legacy.json");
}
@DataProvider(name = "dbType")
public Object[][] getDbType(ITestContext context) throws IOException {
return getJsonNodeFromFile("db-type-legacy.json");
}
@DataProvider(name = "edge")
public Object[][] getEdge(ITestContext context) throws IOException {
return getJsonNodeFromFile("edge-legacy.json");
}
@DataProvider(name = "dbV")
public Object[][] getDBV(ITestContext context) throws IOException {
return getJsonNodeFromFile("db-v-65544.json");
}
@DataProvider(name = "tableV")
public Object[][] getTableV(ITestContext context) throws IOException {
return getJsonNodeFromFile("table-v-147504.json");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
public class GraphSONUtilityPostProcessTest extends BaseUtils {
final String HIVE_COLUMNS_PROPERTY = "hive_table.columns";
final String edgeId1 = "816u-35tc-ao0l-47so";
final String edgeId2 = "82rq-35tc-ao0l-2glc";
final String edgeId1x = "816u-35tc-ao0l-xxxx";
final String edgeId2x = "82rq-35tc-ao0l-xxxx";
private TinkerGraph tg;
private MappedElementCache cache = new MappedElementCache();
private Vertex tableV;
@Test
public void noRefNoUpdate() throws IOException {
tg = TinkerGraph.open();
graphSONUtility = new GraphSONUtility(emptyRelationshipCache);
addEdge(tg, cache);
tableV = fetchTableVertex(tg);
assertNotNull(tableV);
assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV);
graphSONUtility.replaceReferencedEdgeIdForList(tg, cache, tableV, HIVE_COLUMNS_PROPERTY);
assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV);
}
@Test(dependsOnMethods = "noRefNoUpdate")
public void refFoundVertexUpdated() throws IOException {
cache.lruEdgeCache.put(edgeId1, edgeId1x);
cache.lruEdgeCache.put(edgeId2, edgeId2x);
graphSONUtility.replaceReferencedEdgeIdForList(tg, cache, tableV, HIVE_COLUMNS_PROPERTY);
assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1x, edgeId2x, tableV);
}
@Test(dependsOnMethods = "refFoundVertexUpdated")
public void updateUsingPostProcessConsumer() throws IOException {
MappedElementCache cache = new MappedElementCache();
BlockingQueue<Object> bc = new BlockingArrayQueue<>();
PostProcessManager.Consumer consumer = new PostProcessManager.Consumer(bc, tg, graphSONUtility,
new String[] {HIVE_COLUMNS_PROPERTY}, cache, 5);
cache.lruEdgeCache.put(edgeId1x, edgeId1);
cache.lruEdgeCache.put(edgeId2x, edgeId2);
consumer.processItem(tableV.id());
assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV);
}
private void assertListProperty(String HIVE_COLUMNS_PROPERTY, String edgeId1, String edgeId2, Vertex tableV) {
assertTrue(tableV.property(HIVE_COLUMNS_PROPERTY).isPresent());
List list = (List) tableV.property(HIVE_COLUMNS_PROPERTY).value();
assertEquals(list.size(), 2);
assertEquals(list.get(0), edgeId1);
assertEquals(list.get(1), edgeId2);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.testng.Assert;
import org.testng.ITestContext;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
import static org.testng.Assert.*;
public class GraphSONUtilityTest extends BaseUtils {
@Test(dataProvider = "col1")
public void idFetch(JsonNode node) {
Object o = GraphSONUtility.getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
assertNotNull(o);
assertEquals((int) o, 98336);
}
@Test(dataProvider = "col1")
public void verifyReadProperties(JsonNode node) {
Map<String, Object> props = GraphSONUtility.readProperties(node);
assertEquals(props.get("__superTypeNames").getClass(), ArrayList.class);
assertEquals(props.get("Asset.name").getClass(), String.class);
assertEquals(props.get("hive_column.position").getClass(), Integer.class);
assertEquals(props.get("__timestamp").getClass(), Long.class);
assertNotNull(props);
}
@Test(dataProvider = "col1")
public void dataNodeReadAndVertexAddedToGraph(JsonNode entityNode) throws IOException {
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache);
Map<String, Object> map = gu.vertexFromJson(tg, entityNode);
assertNull(map);
assertEquals((long) tg.traversal().V().count().next(), 1L);
Vertex v = tg.vertices().next();
assertTrue(v.property(VERTEX_ID_IN_IMPORT_KEY).isPresent());
}
@Test(dataProvider = "dbType")
public void typeNodeReadAndVertexNotAddedToGraph(JsonNode entityNode) throws IOException {
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache);
gu.vertexFromJson(tg, entityNode);
Assert.assertEquals((long) tg.traversal().V().count().next(), 0L);
}
@Test
public void edgeReadAndAddedToGraph() throws IOException {
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache);
Map<String, Object> m = null;
m = gu.vertexFromJson(tg, (JsonNode) (getDBV(null)[0][0]));
assertNull(m);
m = gu.vertexFromJson(tg, (JsonNode) (getTableV(null))[0][0]);
assertNull(m);
m = gu.edgeFromJson(tg, new MappedElementCache(), (JsonNode) getEdge(null)[0][0]);
assertNull(m);
Assert.assertEquals((long) tg.traversal().V().count().next(), 2L);
Assert.assertEquals((long) tg.traversal().E().count().next(), 1L);
Edge e = tg.edges().next();
assertTrue(e.property(EDGE_ID_IN_IMPORT_KEY).isPresent());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
public class JsonNodeParsersTest extends BaseUtils {
@Test(dataProvider = "col1")
public void parseVertex(JsonNode nd) {
final int COL1_ORIGINAL_ID = 98336;
Object nodeId = getId(nd);
TinkerGraph tg = TinkerGraph.open();
JsonNodeParsers.ParseElement pe = new JsonNodeParsers.ParseVertex();
pe.setContext(graphSONUtility);
pe.parse(tg, new MappedElementCache(), nd);
Vertex v = tg.vertices().next();
Vertex vUsingPe = (Vertex) pe.get(tg, nodeId);
Vertex vUsingOriginalId = (Vertex) pe.getByOriginalId(tg, COL1_ORIGINAL_ID);
Vertex vUsingOriginalId2 = (Vertex) pe.getByOriginalId(tg, nd);
updateParseElement(tg, pe, vUsingPe);
assertNotNull(v);
assertNotNull(vUsingPe);
assertNotNull(vUsingOriginalId);
assertEquals(v.id(), vUsingPe.id());
assertEquals(nodeId, pe.getId(nd));
assertFalse(pe.isTypeNode(nd));
assertEquals(pe.getType(nd), "\"hive_column\"");
assertEquals(vUsingOriginalId.id(), v.id());
assertEquals(vUsingOriginalId2.id(), v.id());
assertProperties(vUsingPe);
}
@Test(dataProvider = "edge")
public void parseEdge(JsonNode nd) throws IOException {
final String EDGE_ORIGINAL_ID = "8k5i-35tc-acyd-1eko";
Object nodeId = getId(nd);
TinkerGraph tg = TinkerGraph.open();
MappedElementCache cache = new MappedElementCache();
JsonNodeParsers.ParseElement peVertex = new JsonNodeParsers.ParseVertex();
peVertex.setContext(graphSONUtility);
peVertex.parse(tg, cache, (JsonNode) (getDBV(null)[0][0]));
peVertex.parse(tg, cache, (JsonNode) (getTableV(null)[0][0]));
JsonNodeParsers.ParseElement pe = new JsonNodeParsers.ParseEdge();
pe.setContext(graphSONUtility);
pe.parse(tg, cache, (JsonNode) getEdge(null)[0][0]);
updateParseElement(tg, pe, nodeId);
Edge e = tg.edges().next();
Edge eUsingPe = (Edge) pe.get(tg, nodeId);
Edge eUsingOriginalId = (Edge) pe.getByOriginalId(tg, EDGE_ORIGINAL_ID);
Edge eUsingOriginalId2 = (Edge) pe.getByOriginalId(tg, nd);
assertNotNull(e);
assertNotNull(eUsingPe);
assertNotNull(eUsingOriginalId);
assertEquals(e.id(), eUsingPe.id());
assertEquals(nodeId, pe.getId(nd));
assertFalse(pe.isTypeNode(nd));
assertEquals(eUsingOriginalId.id(), e.id());
assertEquals(eUsingOriginalId2.id(), e.id());
assertProperties(e);
}
private void updateParseElement(TinkerGraph tg, JsonNodeParsers.ParseElement pe, Object nodeId) {
Map<String, Object> props = new HashMap<>();
props.put("k1", "v1");
props.put("k2", "v2");
pe.update(tg, nodeId, props);
}
private void assertProperties(Element v) {
assertNotNull(v);
assertTrue(v.property("k1").isPresent());
assertTrue(v.property("k2").isPresent());
assertEquals(v.property("k1").value(), "v1");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.testng.ITestContext;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.testng.Assert.*;
public class MappedElementCacheTest extends BaseUtils {
@Test(dataProvider = "col1")
public void vertexFetch(JsonNode node) {
MappedElementCache cache = new MappedElementCache();
TinkerGraph tg = TinkerGraph.open();
addVertex(tg, node);
Vertex vx = cache.getMappedVertex(tg, 98336);
assertNotNull(vx);
assertEquals(cache.lruVertexCache.size(), 1);
assertEquals(cache.lruEdgeCache.size(), 0);
}
@Test
public void edgeFetch() throws IOException {
MappedElementCache cache = new MappedElementCache();
TinkerGraph tg = TinkerGraph.open();
addEdge(tg, cache);
assertEquals(cache.lruVertexCache.size(), 2);
assertEquals(cache.lruEdgeCache.size(), 0);
}
@Test
public void nonExistentVertexReturnsNull() {
TinkerGraph tg = TinkerGraph.open();
MappedElementCache cache = new MappedElementCache();
assertNull(cache.fetchVertex(tg, 1111));
assertNull(cache.fetchEdge(tg, "abcd"));
}
@DataProvider(name = "col1")
public Object[][] getCol1(ITestContext context) throws IOException {
return getJsonNodeFromFile("col-legacy.json");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class ReaderStatusManagerTest {
@Test
public void createsNewStatusNode() {
TinkerGraph tg = TinkerGraph.open();
ReaderStatusManager sm = new ReaderStatusManager(tg, tg);
assertEquals(sm.getStartIndex(), 0L);
assertNotNull(tg.traversal().V(sm.migrationStatusId).next());
MigrationStatus ms = ReaderStatusManager.updateFromVertex(tg, null);
assertEquals(ms.getCurrentIndex(), 0L);
assertEquals(ms.getTotalCount(), 0L);
assertEquals(ms.getOperationStatus(), "NOT STARTED");
assertNotNull(ms.getStartTime());
assertNotNull(ms.getEndTime());
}
@Test
public void verifyUpdates() {
long expectedTotalCount = 1001L;
String expectedOperationStatus = "SUCCESS";
TinkerGraph tg = TinkerGraph.open();
ReaderStatusManager sm = new ReaderStatusManager(tg, tg);
sm.update(tg, 1000L, "IN PROGRESS");
sm.end(tg, expectedTotalCount, expectedOperationStatus);
MigrationStatus ms = ReaderStatusManager.updateFromVertex(tg, null);
assertEquals(ms.getCurrentIndex(), expectedTotalCount);
assertEquals(ms.getTotalCount(), expectedTotalCount);
assertEquals(ms.getOperationStatus(), expectedOperationStatus);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
import org.testng.annotations.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class WorkItemConsumerTest {
private class IntegerConsumerSpy extends WorkItemConsumer<Integer> {
boolean commitDirtyCalled = false;
private boolean updateCommitTimeCalled;
public IntegerConsumerSpy(BlockingQueue<Integer> queue) {
super(queue);
}
@Override
protected void doCommit() {
}
@Override
protected void processItem(Integer item) {
}
@Override
protected void commitDirty() {
commitDirtyCalled = true;
super.commitDirty();
}
@Override
protected void updateCommitTime(long commitTime) {
updateCommitTimeCalled = true;
}
public boolean isCommitDirtyCalled() {
return commitDirtyCalled;
}
public boolean isUpdateCommitTimeCalled() {
return updateCommitTimeCalled;
}
}
@Test
public void callingRunOnEmptyQueueCallsDoesNotCallCommitDirty() {
BlockingQueue<Integer> bc = new LinkedBlockingQueue<>(5);
IntegerConsumerSpy ic = new IntegerConsumerSpy(bc);
ic.run();
assertTrue(bc.isEmpty());
assertTrue(ic.isCommitDirtyCalled());
assertFalse(ic.isUpdateCommitTimeCalled());
}
@Test
public void runOnQueueRemovesItemFromQueuCallsCommitDirty() {
BlockingQueue<Integer> bc = new LinkedBlockingQueue<>(5);
bc.add(1);
IntegerConsumerSpy ic = new IntegerConsumerSpy(bc);
ic.run();
assertTrue(bc.isEmpty());
assertTrue(ic.isCommitDirtyCalled());
assertTrue(ic.isUpdateCommitTimeCalled());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.SkipException;
import org.testng.annotations.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.testng.Assert.assertEquals;
public class WorkItemManagerTest {
private static final Logger LOG = LoggerFactory.getLogger(WorkItemManagerTest.class);
private class IntegerConsumer extends WorkItemConsumer<Integer> {
private final ConcurrentLinkedQueue<Integer> target;
public IntegerConsumer(BlockingQueue<Integer> queue, ConcurrentLinkedQueue<Integer> target) {
super(queue);
this.target = target;
}
@Override
protected void doCommit() {
try {
Thread.sleep(20 * RandomUtils.nextInt(10, 15));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void processItem(Integer item) {
LOG.info("adding: {}: size: {}", item, target.size());
target.add(item);
}
}
private class IntegerConsumerBuilder implements WorkItemBuilder<IntegerConsumer, Integer> {
ConcurrentLinkedQueue<Integer> integers = new ConcurrentLinkedQueue<>();
@Override
public IntegerConsumer build(BlockingQueue<Integer> queue) {
return new IntegerConsumer(queue, integers);
}
}
@Test
public void oneWorkerSequences() {
IntegerConsumerBuilder cb = new IntegerConsumerBuilder();
int numberOfItems = 10;
try {
WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 1);
for (int i = 0; i < numberOfItems; i++) {
wi.produce(i);
}
wi.shutdown();
} catch (InterruptedException e) {
throw new SkipException("Test skipped!");
}
assertEquals(cb.integers.size(), numberOfItems);
Integer[] ints = cb.integers.toArray(new Integer[]{});
for (int i = 0; i < numberOfItems; i++) {
assertEquals(ints[i], i, i);
}
}
@Test
public void multipleWorkersUnpredictableSequence() {
IntegerConsumerBuilder cb = new IntegerConsumerBuilder();
int numberOfItems = 100;
try {
WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 5);
for (int i = 0; i < numberOfItems; i++) {
wi.produce(i);
}
wi.shutdown();
} catch (InterruptedException e) {
throw new SkipException("Test skipped!");
}
assertEquals(cb.integers.size(), numberOfItems);
}
private WorkItemManager<Integer, WorkItemConsumer> getWorkItemManger(IntegerConsumerBuilder cb, int numWorkers) {
return new WorkItemManager<>(cb, 5, numWorkers);
}
}
This source diff could not be displayed because it is too large. You can view the blob instead.
{
"Asset.name": {
"type": "string",
"value": "col4"
},
"hive_column.type": {
"type": "string",
"value": "string"
},
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"entityText": {
"type": "string",
"value": "hive_column owner anonymous qualifiedName stocks.test_table.col4@cl1 name col4 position 0 type string table "
},
"Referenceable.qualifiedName": {
"type": "string",
"value": "stocks.test_table.col4@cl1"
},
"__guid": {
"type": "string",
"value": "0693682a-30ae-4fec-a533-179e572792ce"
},
"__version": {
"type": "integer",
"value": 0
},
"__superTypeNames": {
"type": "list",
"value": [{
"type": "string",
"value": "Asset"
}, {
"type": "string",
"value": "DataSet"
}, {
"type": "string",
"value": "Referenceable"
}]
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__typeName": {
"type": "string",
"value": "hive_column"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693838471
},
"Asset.owner": {
"type": "string",
"value": "anonymous"
},
"hive_column.position": {
"type": "integer",
"value": 0
},
"__timestamp": {
"type": "long",
"value": 1522693826849
},
"_id": 98336,
"_type": "vertex"
}
{
"__type.name": {
"type": "string",
"value": "hive_db"
},
"__type.hive_db.parameters": {
"type": "string",
"value": "{\"multiplicity\":\"{\\\"lower\\\":0,\\\"upper\\\":1,\\\"isUnique\\\":false}\",\"isIndexable\":false,\"isComposite\":false,\"reverseAttributeName\":null,\"dataType\":\"map<string,string>\",\"name\":\"parameters\",\"isUnique\":false}"
},
"__modifiedBy": {
"type": "string",
"value": "root"
},
"__type.hive_db": {
"type": "list",
"value": [{
"type": "string",
"value": "clusterName"
}, {
"type": "string",
"value": "location"
}, {
"type": "string",
"value": "parameters"
}, {
"type": "string",
"value": "ownerType"
}]
},
"__type.options": {
"type": "string",
"value": "null"
},
"__guid": {
"type": "string",
"value": "b2685ea8-16c5-4d54-88f2-41b1d66bd1fb"
},
"__version": {
"type": "long",
"value": 1
},
"__type.hive_db.clusterName": {
"type": "string",
"value": "{\"multiplicity\":\"{\\\"lower\\\":1,\\\"upper\\\":1,\\\"isUnique\\\":false}\",\"isIndexable\":true,\"isComposite\":false,\"reverseAttributeName\":null,\"dataType\":\"string\",\"name\":\"clusterName\",\"isUnique\":false}"
},
"__type.category": {
"type": "string",
"value": "CLASS"
},
"__type.version": {
"type": "string",
"value": "1.0"
},
"__type.hive_db.location": {
"type": "string",
"value": "{\"multiplicity\":\"{\\\"lower\\\":0,\\\"upper\\\":1,\\\"isUnique\\\":false}\",\"isIndexable\":false,\"isComposite\":false,\"reverseAttributeName\":null,\"dataType\":\"string\",\"name\":\"location\",\"isUnique\":false}"
},
"__createdBy": {
"type": "string",
"value": "root"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693758158
},
"__type": {
"type": "string",
"value": "typeSystem"
},
"__type.description": {
"type": "string",
"value": "hive_db"
},
"__timestamp": {
"type": "long",
"value": 1522693758158
},
"__type.hive_db.ownerType": {
"type": "string",
"value": "{\"multiplicity\":\"{\\\"lower\\\":0,\\\"upper\\\":1,\\\"isUnique\\\":false}\",\"isIndexable\":false,\"isComposite\":false,\"reverseAttributeName\":null,\"dataType\":\"hive_principal_type\",\"name\":\"ownerType\",\"isUnique\":false}"
},
"_id": 16392,
"_type": "vertex"
}
{
"hive_db.parameters": {
"type": "list",
"value": []
},
"Asset.name": {
"type": "string",
"value": "stocks"
},
"hive_db.ownerType": {
"type": "string",
"value": "USER"
},
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"entityText": {
"type": "string",
"value": "hive_db owner anonymous ownerType USER qualifiedName stocks@cl1 clusterName cl1 name stocks location hdfs://localhost.localdomain:8020/apps/hive/warehouse/stocks.db "
},
"Referenceable.qualifiedName": {
"type": "string",
"value": "stocks@cl1"
},
"__guid": {
"type": "string",
"value": "229b7fd4-e46e-4338-9e44-18ce630eb5bf"
},
"__version": {
"type": "integer",
"value": 0
},
"__superTypeNames": {
"type": "list",
"value": [{
"type": "string",
"value": "Asset"
}, {
"type": "string",
"value": "Referenceable"
}]
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__typeName": {
"type": "string",
"value": "hive_db"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693838471
},
"hive_db.clusterName": {
"type": "string",
"value": "cl1"
},
"Asset.owner": {
"type": "string",
"value": "anonymous"
},
"hive_db.location": {
"type": "string",
"value": "hdfs://localhost.localdomain:8020/apps/hive/warehouse/stocks.db"
},
"__timestamp": {
"type": "long",
"value": 1522693806944
},
"_id": 65544,
"_type": "vertex"
}
\ No newline at end of file
{
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693835017
},
"__timestamp": {
"type": "long",
"value": 1522693835017
},
"_id": "8k5i-35tc-acyd-1eko",
"_type": "edge",
"_outV": 147504,
"_inV": 65544,
"_label": "__hive_table.db"
}
{
"hive_table.createTime": {
"type": "long",
"value": 1522693834000
},
"hive_table.tableType": {
"type": "string",
"value": "VIRTUAL_VIEW"
},
"Asset.name": {
"type": "string",
"value": "test_table_view"
},
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"entityText": {
"type": "string",
"value": "hive_table owner anonymous temporary false lastAccessTime Mon Apr 02 11:30:34 PDT 2018 qualifiedName stocks.test_table_view@cl1 columns viewExpandedText select `test_table`.`col1`, `test_table`.`col2`, `test_table`.`col4` from `stocks`.`test_table` sd tableType VIRTUAL_VIEW createTime Mon Apr 02 11:30:34 PDT 2018 name test_table_view partitionKeys parameters transient_lastDdlTime 1522693834 db retention 0 viewOriginalText select col1, col2, col4 from test_table "
},
"Referenceable.qualifiedName": {
"type": "string",
"value": "stocks.test_table_view@cl1"
},
"hive_table.parameters.transient_lastDdlTime": {
"type": "string",
"value": "1522693834"
},
"hive_table.parameters": {
"type": "list",
"value": [{
"type": "string",
"value": "transient_lastDdlTime"
}]
},
"hive_table.retention": {
"type": "integer",
"value": 0
},
"hive_table.partitionKeys": {
"type": "list",
"value": [{
"type": "string",
"value": "8dty-35tc-amfp-23xs"
}]
},
"__guid": {
"type": "string",
"value": "111091f1-2661-4946-b09b-64e28f10c109"
},
"hive_table.temporary": {
"type": "boolean",
"value": false
},
"__version": {
"type": "integer",
"value": 0
},
"__superTypeNames": {
"type": "list",
"value": [{
"type": "string",
"value": "Asset"
}, {
"type": "string",
"value": "DataSet"
}, {
"type": "string",
"value": "Referenceable"
}]
},
"hive_table.viewExpandedText": {
"type": "string",
"value": "select `test_table`.`col1`, `test_table`.`col2`, `test_table`.`col4` from `stocks`.`test_table`"
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__typeName": {
"type": "string",
"value": "hive_table"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693838471
},
"Asset.owner": {
"type": "string",
"value": "anonymous"
},
"hive_table.lastAccessTime": {
"type": "long",
"value": 1522693834000
},
"hive_table.viewOriginalText": {
"type": "string",
"value": "select col1, col2, col4 from test_table"
},
"hive_table.columns": {
"type": "list",
"value": [{
"type": "string",
"value": "816u-35tc-ao0l-47so"
}, {
"type": "string",
"value": "82rq-35tc-ao0l-2glc"
}]
},
"__timestamp": {
"type": "long",
"value": 1522693835017
},
"_id": 147504,
"_type": "vertex"
}
...@@ -35,6 +35,7 @@ import com.tinkerpop.pipes.util.structures.Row; ...@@ -35,6 +35,7 @@ import com.tinkerpop.pipes.util.structures.Row;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.groovy.GroovyExpression; import org.apache.atlas.groovy.GroovyExpression;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
...@@ -419,6 +420,11 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> { ...@@ -419,6 +420,11 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> {
public void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException { public void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException {
} }
@Override
public MigrationStatus getMigrationStatus() {
return new MigrationStatus();
}
public void addMultiProperties(Set<String> names) { public void addMultiProperties(Set<String> names) {
multiProperties.addAll(names); multiProperties.addAll(names);
} }
......
...@@ -18,97 +18,58 @@ ...@@ -18,97 +18,58 @@
package org.apache.atlas.repository.impexp; package org.apache.atlas.repository.impexp;
import com.google.common.cache.CacheBuilder; import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.model.impexp.MigrationStatus; import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@AtlasService @AtlasService
@Singleton @Singleton
public class MigrationProgressService { public class MigrationProgressService {
private static final Logger LOG = LoggerFactory.getLogger(MigrationProgressService.class); private static final Logger LOG = LoggerFactory.getLogger(MigrationProgressService.class);
public static final String MIGRATION_QUERY_CACHE_TTL = "atlas.migration.query.cache.ttlInSecs";
private static final String MIGRATION_STATUS_TYPE_NAME = "__MigrationStatus"; @VisibleForTesting
private static final String CURRENT_INDEX_PROPERTY = "currentIndex"; static long DEFAULT_CACHE_TTL_IN_SECS = 30 * 1000; // 30 secs
private static final String OPERATION_STATUS_PROPERTY = "operationStatus";
private static final String START_TIME_PROPERTY = "startTime";
private static final String END_TIME_PROPERTY = "endTime";
private static final String TOTAL_COUNT_PROPERTY = "totalCount";
private static final String MIGRATION_STATUS_KEY = "1";
private final long cacheValidity;
private final AtlasGraph graph; private final AtlasGraph graph;
private final MigrationStatus defaultStatus = new MigrationStatus(); private MigrationStatus cachedStatus;
private LoadingCache<String, MigrationStatus> cache; private long cacheExpirationTime = 0;
@Inject @Inject
public MigrationProgressService(AtlasGraph graph) { public MigrationProgressService(Configuration configuration, AtlasGraph graph) {
this.graph = graph; this.graph = graph;
this.cacheValidity = (configuration != null) ?
configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) :
DEFAULT_CACHE_TTL_IN_SECS;
} }
public MigrationStatus getStatus() { public MigrationStatus getStatus() {
try { return fetchStatus();
if (cache == null) {
initCache();
cache.get(MIGRATION_STATUS_KEY);
}
if(cache.size() > 0) {
return cache.get(MIGRATION_STATUS_KEY);
}
return defaultStatus;
} catch (ExecutionException e) {
return defaultStatus;
}
} }
private void initCache() { private MigrationStatus fetchStatus() {
this.cache = CacheBuilder.newBuilder().refreshAfterWrite(30, TimeUnit.SECONDS). long currentTime = System.currentTimeMillis();
build(new CacheLoader<String, MigrationStatus>() { if(resetCache(currentTime)) {
@Override cachedStatus = graph.getMigrationStatus();
public MigrationStatus load(String key) { }
try {
return from(fetchStatusVertex());
} catch (Exception e) {
LOG.error("Error retrieving status.", e);
return defaultStatus;
}
}
private MigrationStatus from(AtlasVertex vertex) {
if (vertex == null) {
return null;
}
MigrationStatus ms = new MigrationStatus();
ms.setStartTime(GraphHelper.getSingleValuedProperty(vertex, START_TIME_PROPERTY, Date.class)); return cachedStatus;
ms.setEndTime(GraphHelper.getSingleValuedProperty(vertex, END_TIME_PROPERTY, Date.class)); }
ms.setCurrentIndex(GraphHelper.getSingleValuedProperty(vertex, CURRENT_INDEX_PROPERTY, Long.class));
ms.setOperationStatus(GraphHelper.getSingleValuedProperty(vertex, OPERATION_STATUS_PROPERTY, String.class));
ms.setTotalCount(GraphHelper.getSingleValuedProperty(vertex, TOTAL_COUNT_PROPERTY, Long.class));
return ms; private boolean resetCache(long currentTime) {
} boolean ret = cachedStatus == null || currentTime > cacheExpirationTime;
if(ret) {
cacheExpirationTime = currentTime + cacheValidity;
}
private AtlasVertex fetchStatusVertex() { return ret;
Iterator<AtlasVertex> itr = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, MIGRATION_STATUS_TYPE_NAME).vertices().iterator();
return itr.hasNext() ? itr.next() : null;
}
});
} }
} }
...@@ -120,10 +120,12 @@ public class DataMigrationService implements Service { ...@@ -120,10 +120,12 @@ public class DataMigrationService implements Service {
@VisibleForTesting @VisibleForTesting
void processIncomingTypesDef(File typesDefFile) throws AtlasBaseException { void processIncomingTypesDef(File typesDefFile) throws AtlasBaseException {
try { try {
AtlasImportResult result = new AtlasImportResult();
String jsonStr = FileUtils.readFileToString(typesDefFile); String jsonStr = FileUtils.readFileToString(typesDefFile);
AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class); AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class);
ImportTypeDefProcessor processor = new ImportTypeDefProcessor(typeDefStore, typeRegistry); ImportTypeDefProcessor processor = new ImportTypeDefProcessor(typeDefStore, typeRegistry);
processor.processTypes(typesDef, new AtlasImportResult()); processor.processTypes(typesDef, result);
LOG.info(" types migrated: {}", result.getMetrics());
} catch (IOException e) { } catch (IOException e) {
LOG.error("processIncomingTypesDef: Could not process file: {}! Imported data may not be usable.", typesDefFile.getName()); LOG.error("processIncomingTypesDef: Could not process file: {}! Imported data may not be usable.", typesDefFile.getName());
} }
......
...@@ -43,11 +43,16 @@ public class HiveParititionTest extends MigrationBaseAsserts { ...@@ -43,11 +43,16 @@ public class HiveParititionTest extends MigrationBaseAsserts {
@Test @Test
public void fileImporterTest() throws IOException, AtlasBaseException { public void fileImporterTest() throws IOException, AtlasBaseException {
final int EXPECTED_TOTAL_COUNT = 140;
final int EXPECTED_DB_COUNT = 1;
final int EXPECTED_TABLE_COUNT = 2;
final int EXPECTED_COLUMN_COUNT = 7;
runFileImporter("parts_db"); runFileImporter("parts_db");
assertPartitionKeyProperty(getVertex("hive_table", "t1"), 1); assertPartitionKeyProperty(getVertex("hive_table", "t1"), 1);
assertPartitionKeyProperty(getVertex("hive_table", "tv1"), 1); assertPartitionKeyProperty(getVertex("hive_table", "tv1"), 1);
assertHiveVertices(1, 2, 7); assertHiveVertices(EXPECTED_DB_COUNT, EXPECTED_TABLE_COUNT, EXPECTED_COLUMN_COUNT);
assertTypeCountNameGuid("hive_db", 1, "parts_db", "ae30d78b-51b4-42ab-9436-8d60c8f68b95"); assertTypeCountNameGuid("hive_db", 1, "parts_db", "ae30d78b-51b4-42ab-9436-8d60c8f68b95");
assertTypeCountNameGuid("hive_process", 1, "", ""); assertTypeCountNameGuid("hive_process", 1, "", "");
...@@ -55,7 +60,7 @@ public class HiveParititionTest extends MigrationBaseAsserts { ...@@ -55,7 +60,7 @@ public class HiveParititionTest extends MigrationBaseAsserts {
assertEdges("hive_table", "t1", AtlasEdgeDirection.OUT, 1, 1, "hive_db_tables"); assertEdges("hive_table", "t1", AtlasEdgeDirection.OUT, 1, 1, "hive_db_tables");
assertEdges("hive_table", "tv1", AtlasEdgeDirection.OUT, 1, 1, "hive_db_tables"); assertEdges("hive_table", "tv1", AtlasEdgeDirection.OUT, 1, 1, "hive_db_tables");
assertMigrationStatus(136); assertMigrationStatus(EXPECTED_TOTAL_COUNT);
} }
private void assertPartitionKeyProperty(AtlasVertex vertex, int expectedCount) { private void assertPartitionKeyProperty(AtlasVertex vertex, int expectedCount) {
......
...@@ -37,9 +37,14 @@ public class HiveStocksTest extends MigrationBaseAsserts { ...@@ -37,9 +37,14 @@ public class HiveStocksTest extends MigrationBaseAsserts {
@Test @Test
public void migrateStocks() throws AtlasBaseException, IOException { public void migrateStocks() throws AtlasBaseException, IOException {
final int EXPECTED_TOTAL_COUNT = 187;
final int EXPECTED_DB_COUNT = 1;
final int EXPECTED_TABLE_COUNT = 1;
final int EXPECTED_COLUMN_COUNT = 7;
runFileImporter("stocks_db"); runFileImporter("stocks_db");
assertHiveVertices(1, 1, 7); assertHiveVertices(EXPECTED_DB_COUNT, EXPECTED_TABLE_COUNT, EXPECTED_COLUMN_COUNT);
assertTypeCountNameGuid("hive_db", 1, "stocks", "4e13b36b-9c54-4616-9001-1058221165d0"); assertTypeCountNameGuid("hive_db", 1, "stocks", "4e13b36b-9c54-4616-9001-1058221165d0");
assertTypeCountNameGuid("hive_table", 1, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774"); assertTypeCountNameGuid("hive_table", 1, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774");
assertTypeAttribute("hive_table", 7, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774", "hive_table.columns"); assertTypeAttribute("hive_table", 7, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774", "hive_table.columns");
...@@ -58,6 +63,6 @@ public class HiveStocksTest extends MigrationBaseAsserts { ...@@ -58,6 +63,6 @@ public class HiveStocksTest extends MigrationBaseAsserts {
assertEdges(getVertex("hive_table", "stocks_daily").getEdges(AtlasEdgeDirection.OUT).iterator(), 1, 1, "hive_db_tables"); assertEdges(getVertex("hive_table", "stocks_daily").getEdges(AtlasEdgeDirection.OUT).iterator(), 1, 1, "hive_db_tables");
assertEdges(getVertex("hive_column", "high").getEdges(AtlasEdgeDirection.OUT).iterator(), 1,1, "hive_table_columns"); assertEdges(getVertex("hive_column", "high").getEdges(AtlasEdgeDirection.OUT).iterator(), 1,1, "hive_table_columns");
assertMigrationStatus(187); assertMigrationStatus(EXPECTED_TOTAL_COUNT);
} }
} }
...@@ -40,7 +40,7 @@ import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr; ...@@ -40,7 +40,7 @@ import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertTrue; import static org.testng.Assert.assertTrue;
public class MigrationBaseAsserts { public class MigrationBaseAsserts {
protected static final String ASSERT_NAME_PROPERTY = "Asset.name"; protected static final String ASSERT_NAME_PROPERTY = "Asset.name";
...@@ -76,6 +76,7 @@ public class MigrationBaseAsserts { ...@@ -76,6 +76,7 @@ public class MigrationBaseAsserts {
private void loadTypesFromJson() throws IOException, AtlasBaseException { private void loadTypesFromJson() throws IOException, AtlasBaseException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry); loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1020-fs_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry); loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager;
import org.apache.atlas.repository.impexp.MigrationProgressService;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.testng.annotations.Test;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
public class MigrationProgressServiceTest {
private final long currentIndex = 100l;
private final long totalIndex = 1000l;
private final long increment = 1001l;
private final String statusSuccess = ReaderStatusManager.STATUS_SUCCESS;
private static class AtlasTinkerGraph {
public static AtlasGraph create(TinkerGraph tg) {
AtlasGraph g = mock(AtlasGraph.class);
when(g.getMigrationStatus()).thenAnswer(invocation -> ReaderStatusManager.get(tg));
return g;
}
public static AtlasGraph create() {
return create(TinkerGraph.open());
}
}
@Test
public void absentStatusNodeReturnsDefaultStatus() {
MigrationProgressService mps = getMigrationStatusForTest(null, null);
MigrationStatus ms = mps.getStatus();
assertNotNull(ms);
assertTrue(StringUtils.isEmpty(ms.getOperationStatus()));
assertEquals(ms.getCurrentIndex(), 0);
assertEquals(ms.getTotalCount(), 0);
}
@Test
public void existingStatusNodeRetrurnStatus() {
final long currentIndex = 100l;
final long totalIndex = 1000l;
final String status = ReaderStatusManager.STATUS_SUCCESS;
TinkerGraph tg = createUpdateStatusNode(null, currentIndex, totalIndex, status);
MigrationProgressService mps = getMigrationStatusForTest(null, tg);
MigrationStatus ms = mps.getStatus();
assertMigrationStatus(totalIndex, status, ms);
}
@Test
public void cachedStatusReturnedIfQueriedBeforeCacheExpiration() {
TinkerGraph tg = createUpdateStatusNode(null, currentIndex, totalIndex, statusSuccess);
MigrationProgressService mps = getMigrationStatusForTest(null, tg);
MigrationStatus ms = mps.getStatus();
createUpdateStatusNode(tg, currentIndex + increment, totalIndex + increment, ReaderStatusManager.STATUS_FAILED);
MigrationStatus ms2 = mps.getStatus();
assertEquals(ms.hashCode(), ms2.hashCode());
assertMigrationStatus(totalIndex, statusSuccess, ms);
}
private MigrationProgressService getMigrationStatusForTest(Configuration cfg, TinkerGraph tg) {
return new MigrationProgressService(cfg, AtlasTinkerGraph.create(tg));
}
@Test
public void cachedUpdatedIfQueriedAfterCacheExpiration() throws InterruptedException {
final String statusFailed = ReaderStatusManager.STATUS_FAILED;
TinkerGraph tg = createUpdateStatusNode(null, currentIndex, totalIndex, statusSuccess);
long cacheTTl = 100l;
MigrationProgressService mps = getMigrationStatusForTest(getStubConfiguration(cacheTTl), tg);
MigrationStatus ms = mps.getStatus();
assertMigrationStatus(totalIndex, statusSuccess, ms);
createUpdateStatusNode(tg, currentIndex + increment, totalIndex + increment, ReaderStatusManager.STATUS_FAILED);
Thread.sleep(2 * cacheTTl);
MigrationStatus ms2 = mps.getStatus();
assertNotEquals(ms.hashCode(), ms2.hashCode());
assertMigrationStatus(totalIndex + increment, statusFailed, ms2);
}
private Configuration getStubConfiguration(long ttl) {
Configuration cfg = mock(Configuration.class);
when(cfg.getLong(anyString(), anyLong())).thenReturn(ttl);
return cfg;
}
private TinkerGraph createUpdateStatusNode(TinkerGraph tg, long currentIndex, long totalIndex, String status) {
if(tg == null) {
tg = TinkerGraph.open();
}
ReaderStatusManager rsm = new ReaderStatusManager(tg, tg);
rsm.update(tg, currentIndex);
rsm.end(tg, totalIndex, status);
return tg;
}
private void assertMigrationStatus(long totalIndex, String status, MigrationStatus ms) {
assertNotNull(ms);
assertEquals(ms.getOperationStatus(), status);
assertEquals(ms.getCurrentIndex(), totalIndex);
assertEquals(ms.getTotalCount(), totalIndex);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasBuiltInTypes;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Iterator;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class PathTest extends MigrationBaseAsserts {
@Inject
public PathTest(AtlasGraph graph) {
super(graph);
}
@Test
public void migrationImport() throws IOException, AtlasBaseException {
runFileImporter("path_db");
AtlasVertex v = assertHdfsPathVertices(1);
assertVertexProperties(v);
assertMigrationStatus(88);
}
private void assertVertexProperties(AtlasVertex v) {
final String HASH_CODE_PROPERTY = "hdfs_path.hashCode";
final String RETENTION_PROPERTY = "hdfs_path.retention";
AtlasBuiltInTypes.AtlasBigIntegerType bitRef = new AtlasBuiltInTypes.AtlasBigIntegerType();
AtlasBuiltInTypes.AtlasBigDecimalType bdtRef = new AtlasBuiltInTypes.AtlasBigDecimalType();
BigInteger bitExpected = bitRef.getNormalizedValue(612361213421234L);
BigDecimal bdtExpected = bdtRef.getNormalizedValue(125353);
BigInteger bit = GraphHelper.getSingleValuedProperty(v, HASH_CODE_PROPERTY, BigInteger.class);
BigDecimal bdt = GraphHelper.getSingleValuedProperty(v, RETENTION_PROPERTY, BigDecimal.class);
assertEquals(bit, bitExpected);
assertEquals(bdt.compareTo(bdtExpected), 0);
}
protected AtlasVertex assertHdfsPathVertices(int expectedCount) {
int i = 0;
AtlasVertex vertex = null;
Iterator<AtlasVertex> results = getVertices("hdfs_path", null);
for (Iterator<AtlasVertex> it = results; it.hasNext(); i++) {
vertex = it.next();
assertNotNull(vertex);
}
assertEquals(i, expectedCount);
return vertex;
}
}
...@@ -29,6 +29,7 @@ import org.apache.atlas.store.AtlasTypeDefStore; ...@@ -29,6 +29,7 @@ import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasRelationshipType; import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.jcodings.util.Hash; import org.jcodings.util.Hash;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment