Commit 47ec9f7a by Ashutosh Mestry Committed by Madhan Neethiraj

ATLAS-2637: migration-import updates for changes in collection attribute storage

parent a0269b9c
......@@ -35,6 +35,6 @@ public final class AtlasConstants {
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
public static final String DEFAULT_TYPE_VERSION = "1.0";
public static final String ATLAS_MIGRATION_MODE_FILENAME = "atlas.migration.mode.filename";
public static final String ATLAS_MIGRATION_MODE_FILENAME = "atlas.migration.data.filename";
public static final String ATLAS_SERVICES_ENABLED = "atlas.services.enabled";
}
......@@ -30,6 +30,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.groovy.GroovyExpression;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
/**
* Represents a graph.
......@@ -320,7 +321,7 @@ public interface AtlasGraph<V, E> {
*/
boolean isMultiProperty(String name);
void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException;
void importLegacyGraphSON(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException;
MigrationStatus getMigrationStatus();
}
......@@ -26,10 +26,12 @@ import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDatabase;
import org.apache.atlas.repository.graphdb.janus.migration.AtlasGraphSONReader;
import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager;
import org.apache.atlas.repository.graphdb.janus.migration.ElementProcessors;
import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.configuration.Configuration;
......@@ -47,7 +49,6 @@ import java.io.InputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Map;
/**
* Default implementation for Graph Provider that doles out JanusGraph.
......@@ -231,7 +232,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
return ret;
}
public static void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException {
public static void loadLegacyGraphSON(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
......@@ -242,9 +243,10 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
}
AtlasGraphSONReader legacyGraphSONReader = AtlasGraphSONReader.build().
relationshipCache(relationshipCache).
relationshipCache(new ElementProcessors(typeRegistry)).
schemaDB(getGraphInstance()).
bulkLoadingDB(getBulkLoadingGraphInstance()).create();
bulkLoadingDB(getBulkLoadingGraphInstance()).
create();
legacyGraphSONReader.readGraph(fs);
} catch (Exception ex) {
......
......@@ -37,44 +37,46 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public final class AtlasGraphSONReader {
private static final Logger LOG = LoggerFactory.getLogger(AtlasGraphSONReader.class);
private static String APPLICATION_PROPERTY_MIGRATION_START_INDEX = "atlas.migration.mode.start.index";
private static String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
private static String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
private final ObjectMapper mapper;
private final RelationshipTypeCache relationshipCache;
private final ElementProcessors relationshipCache;
private final Graph graph;
private final Graph bulkLoadGraph;
private final int numWorkers;
private final int batchSize;
private final long suppliedStartIndex;
private final String[] propertiesToPostProcess;
private GraphSONUtility graphSONUtility;
private final GraphSONUtility graphSONUtility;
private ReaderStatusManager readerStatusManager;
private AtomicLong counter;
private AtlasGraphSONReader(ObjectMapper mapper, Map<String, String> relationshipLookup, Graph graph,
Graph bulkLoadGraph, String[] propertiesToPostProcess, int numWorkers, int batchSize, long suppliedStartIndex) {
private AtlasGraphSONReader(ObjectMapper mapper, ElementProcessors relationshipLookup, Graph graph,
Graph bulkLoadGraph, int numWorkers, int batchSize, long suppliedStartIndex) {
this.mapper = mapper;
this.relationshipCache = new RelationshipTypeCache(relationshipLookup);
this.relationshipCache = relationshipLookup;
this.graph = graph;
this.bulkLoadGraph = bulkLoadGraph;
this.numWorkers = numWorkers;
this.batchSize = batchSize;
this.suppliedStartIndex = suppliedStartIndex;
this.propertiesToPostProcess = propertiesToPostProcess;
this.graphSONUtility = new GraphSONUtility(relationshipCache);
}
public void readGraph(final InputStream inputStream) throws IOException {
counter = new AtomicLong(0);
graphSONUtility = new GraphSONUtility(relationshipCache);
final long startIndex = initStatusManager();
final JsonFactory factory = mapper.getFactory();
LOG.info("AtlasGraphSONReader.readGraph: numWorkers: {}: batchSize: {}: startIndex: {}", numWorkers, batchSize, startIndex);
try (JsonParser parser = factory.createParser(inputStream)) {
if (parser.nextToken() != JsonToken.START_OBJECT) {
throw new IOException("Expected data to start with an Object");
......@@ -88,6 +90,7 @@ public final class AtlasGraphSONReader {
switch (fieldName) {
case GraphSONTokensTP2.MODE:
parser.nextToken();
final String mode = parser.getText();
if (!mode.equals("EXTENDED")) {
......@@ -180,8 +183,7 @@ public final class AtlasGraphSONReader {
LOG.info("postProcess: Starting... : counter at: {}", counter.get());
try {
PostProcessManager.WorkItemsManager wim = PostProcessManager.create(bulkLoadGraph, graphSONUtility,
propertiesToPostProcess, batchSize, numWorkers);
PostProcessManager.WorkItemsManager wim = PostProcessManager.create(bulkLoadGraph, relationshipCache.getPropertiesToPostProcess(), batchSize, numWorkers);
GraphTraversal query = bulkLoadGraph.traversal().V();
while (query.hasNext()) {
......@@ -228,30 +230,32 @@ public final class AtlasGraphSONReader {
}
}
public static Builder build() throws AtlasException {
public static Builder build() {
return new Builder();
}
public final static class Builder {
private int batchSize = 500;
private Map<String, String> relationshipCache;
private ElementProcessors relationshipCache;
private Graph graph;
private Graph bulkLoadGraph;
private int numWorkers;
private long suppliedStartIndex;
private String[] propertiesToPostProcess;
private Builder() {
}
private void setDefaults() throws AtlasException {
this.startIndex(ApplicationProperties.get().getLong("atlas.migration.mode.start.index", 0L))
.numWorkers(ApplicationProperties.get().getInt("atlas.migration.mode.workers", 4))
.batchSize(ApplicationProperties.get().getInt("atlas.migration.mode.batch.size", 3000))
.propertiesToPostProcess(getPropertiesToPostProcess("atlas.migration.mode.postprocess.properties"));
private void setDefaults() {
try {
this.startIndex(ApplicationProperties.get().getLong(APPLICATION_PROPERTY_MIGRATION_START_INDEX, 0L))
.numWorkers(ApplicationProperties.get().getInt(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, 4))
.batchSize(ApplicationProperties.get().getInt(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, 3000));
} catch (AtlasException ex) {
LOG.error("setDefaults: failed!", ex);
}
}
public AtlasGraphSONReader create() throws AtlasException {
public AtlasGraphSONReader create() {
setDefaults();
if(bulkLoadGraph == null) {
bulkLoadGraph = graph;
......@@ -261,10 +265,10 @@ public final class AtlasGraphSONReader {
final GraphSONMapper mapper = builder.typeInfo(TypeInfo.NO_TYPES).create();
return new AtlasGraphSONReader(mapper.createMapper(), relationshipCache, graph, bulkLoadGraph,
propertiesToPostProcess, numWorkers, batchSize, suppliedStartIndex);
numWorkers, batchSize, suppliedStartIndex);
}
public Builder relationshipCache(Map<String, String> relationshipCache) {
public Builder relationshipCache(ElementProcessors relationshipCache) {
this.relationshipCache = relationshipCache;
return this;
......@@ -305,27 +309,5 @@ public final class AtlasGraphSONReader {
return this;
}
public Builder propertiesToPostProcess(String[] list) {
this.propertiesToPostProcess = list;
return this;
}
private static String[] getPropertiesToPostProcess(String applicationPropertyKey) throws AtlasException {
final String HIVE_COLUMNS_PROPERTY = "hive_table.columns";
final String HIVE_PARTITION_KEYS_PROPERTY = "hive_table.partitionKeys";
final String PROCESS_INPUT_PROPERTY = "Process.inputs";
final String PROCESS_OUTPUT_PROPERTY = "Process.outputs";
final String USER_PROFILE_OUTPUT_PROPERTY = "__AtlasUserProfile.savedSearches";
String[] defaultProperties = new String[] { HIVE_COLUMNS_PROPERTY, HIVE_PARTITION_KEYS_PROPERTY,
PROCESS_INPUT_PROPERTY, PROCESS_OUTPUT_PROPERTY,
USER_PROFILE_OUTPUT_PROPERTY
};
String[] userDefinedList = ApplicationProperties.get().getStringArray(applicationPropertyKey);
return (userDefinedList == null || userDefinedList.length == 0) ? defaultProperties : userDefinedList;
}
}
}
......@@ -18,10 +18,9 @@
package org.apache.atlas.repository.graphdb.janus.migration;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasBuiltInTypes;
import org.apache.commons.lang.StringUtils;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasBigDecimalType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasBigIntegerType;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgeFeatures;
......@@ -34,21 +33,17 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
class GraphSONUtility {
private static final Logger LOG = LoggerFactory.getLogger(GraphSONUtility.class);
private static final String EMPTY_STRING = "";
private static final AtlasBigIntegerType bigIntegerType = new AtlasBigIntegerType();
private static final AtlasBigDecimalType bigDecimalType = new AtlasBigDecimalType();
private final RelationshipTypeCache relationshipTypeCache;
private static AtlasBuiltInTypes.AtlasBigIntegerType bigIntegerType = new AtlasBuiltInTypes.AtlasBigIntegerType();
private static AtlasBuiltInTypes.AtlasBigDecimalType bigDecimalType = new AtlasBuiltInTypes.AtlasBigDecimalType();
private final ElementProcessors elementProcessors;
public GraphSONUtility(final RelationshipTypeCache relationshipTypeCache) {
this.relationshipTypeCache = relationshipTypeCache;
public GraphSONUtility(final ElementProcessors elementProcessors) {
this.elementProcessors = elementProcessors;
}
public Map<String, Object> vertexFromJson(Graph g, final JsonNode json) {
......@@ -64,6 +59,7 @@ class GraphSONUtility {
Vertex vertex = vertexFeatures.willAllowId(vertexId) ? g.addVertex(T.id, vertexId) : g.addVertex();
props.put(Constants.VERTEX_ID_IN_IMPORT_KEY, vertexId);
elementProcessors.processCollections(Constants.ENTITY_TYPE_PROPERTY_KEY, props);
for (Map.Entry<String, Object> entry : props.entrySet()) {
try {
......@@ -107,17 +103,11 @@ class GraphSONUtility {
props.put(Constants.EDGE_ID_IN_IMPORT_KEY, edgeId.toString());
if(addRelationshipTypeForClassification(in, out, label, props)) {
label = Constants.CLASSIFICATION_LABEL;
} else {
addRelationshipTypeName(label, props);
}
label = elementProcessors.updateEdge(in, out, edgeId, label, props);
EdgeFeatures edgeFeatures = g.features().edge();
final Edge edge = edgeFeatures.willAllowId(edgeId) ? out.addEdge(label, in, T.id, edgeId) : out.addEdge(label, in);
addMandatoryRelationshipProperties(props);
for (Map.Entry<String, Object> entry : props.entrySet()) {
try {
edge.property(entry.getKey(), entry.getValue());
......@@ -153,82 +143,6 @@ class GraphSONUtility {
return cache.getMappedVertex(gr, inVId);
}
private boolean addRelationshipTypeForClassification(Vertex in, Vertex out, String label, Map<String, Object> props) {
if (in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
String inTypeName = (String) in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
if (inTypeName.equals(label)) {
if (StringUtils.isNotEmpty(inTypeName)) {
props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, inTypeName);
addEntityGuidToTrait(in, out);
return true;
} else {
LOG.info("Could not find typeName for trait: {}", label);
}
}
}
return false;
}
private void addEntityGuidToTrait(Vertex in, Vertex out) {
String entityGuid = "";
if (out.property(Constants.GUID_PROPERTY_KEY).isPresent()) {
entityGuid = (String) out.property(Constants.GUID_PROPERTY_KEY).value();
}
if(StringUtils.isNotEmpty(entityGuid)) {
in.property(CLASSIFICATION_ENTITY_GUID, entityGuid);
}
}
private void addRelationshipTypeName(String edgeLabel, Map<String, Object> props) {
String typeName = relationshipTypeCache.get(edgeLabel);
if (StringUtils.isNotEmpty(typeName)) {
props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Could not find relationship type for: {}", edgeLabel);
}
}
}
private void addMandatoryRelationshipProperties(Map<String, Object> props) {
props.put(Constants.RELATIONSHIP_GUID_PROPERTY_KEY, UUID.randomUUID().toString());
props.put(RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, "NONE");
props.put(CLASSIFICATION_EDGE_STATE_PROPERTY_KEY, "ACTIVE");
}
public void replaceReferencedEdgeIdForList(Graph g, MappedElementCache cache, Vertex v, String propertyName) {
try {
if (v.property(Constants.TYPENAME_PROPERTY_KEY).isPresent() || !v.property(propertyName).isPresent()) {
return;
}
List list = (List) v.property(propertyName).value();
for (int i = 0; i < list.size(); i++) {
String id = list.get(i).toString();
Object newId = cache.getMappedEdge(g, id);
if (newId == null) {
continue;
}
list.set(i, newId.toString());
}
v.property(propertyName, list);
} catch (IllegalArgumentException ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("processItem: IllegalArgumentException: v[{}] error!", v.id(), ex);
}
}
}
@VisibleForTesting
static Map<String, Object> readProperties(final JsonNode node) {
final Map<String, Object> map = new HashMap<>();
final Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
......@@ -303,7 +217,6 @@ class GraphSONUtility {
return array;
}
@VisibleForTesting
static Object getTypedValueFromJsonNode(final JsonNode node) {
Object theValue = null;
......
......@@ -101,6 +101,7 @@ public class JsonNodeProcessManager {
private void commitRegular() {
commit(graph, nodes.size());
cache.clearAll();
}
private void commit(Graph g, int size) {
......
......@@ -18,9 +18,7 @@
package org.apache.atlas.repository.graphdb.janus.migration;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.utils.LruCache;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
......@@ -28,18 +26,13 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
public class MappedElementCache {
private static final Logger LOG = LoggerFactory.getLogger(MappedElementCache.class);
@VisibleForTesting
final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 100000);
@VisibleForTesting
final Map<String, String> lruEdgeCache = new LruCache<>(500, 100000);
public Vertex getMappedVertex(Graph gr, Object key) {
try {
Vertex ret = lruVertexCache.get(key);
......@@ -62,32 +55,6 @@ public class MappedElementCache {
}
}
public String getMappedEdge(Graph gr, String key) {
try {
String ret = lruEdgeCache.get(key);
if (ret == null) {
synchronized (lruEdgeCache) {
ret = lruEdgeCache.get(key);
if (ret == null) {
Edge e = fetchEdge(gr, key);
ret = e.id().toString();
lruEdgeCache.put(key, ret);
}
}
}
return ret;
} catch (Exception ex) {
LOG.error("getMappedEdge: {}", key, ex);
return null;
}
}
@VisibleForTesting
Vertex fetchVertex(Graph gr, Object key) {
try {
return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, key).next();
......@@ -97,18 +64,7 @@ public class MappedElementCache {
}
}
@VisibleForTesting
Edge fetchEdge(Graph gr, String key) {
try {
return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, key).next();
} catch (Exception ex) {
LOG.error("fetchEdge: fetchFromDB failed: {}", key);
return null;
}
}
public void clearAll() {
lruVertexCache.clear();
lruEdgeCache.clear();
}
}
......@@ -21,36 +21,42 @@ package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager;
import org.apache.atlas.repository.graphdb.janus.migration.postProcess.PostProcessListProperty;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY;
public class PostProcessManager {
static class Consumer extends WorkItemConsumer<Object> {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private final Graph bulkLoadGraph;
private final GraphSONUtility utility;
private final String[] properties;
private final MappedElementCache cache;
private final Map<String, Map<String, List<String>>> typePropertiesMap;
private final int batchSize;
private long counter;
private long batchCounter;
private final PostProcessListProperty processor;
private final String[] nonPrimitiveCategoryKeys;
public Consumer(BlockingQueue<Object> queue, Graph bulkLoadGraph, GraphSONUtility utility,
String[] properties, MappedElementCache cache, int batchSize) {
public Consumer(BlockingQueue<Object> queue, Graph bulkLoadGraph, Map<String, Map<String, List<String>>> typePropertiesMap, int batchSize) {
super(queue);
this.bulkLoadGraph = bulkLoadGraph;
this.utility = utility;
this.properties = properties;
this.cache = cache;
this.typePropertiesMap = typePropertiesMap;
this.batchSize = batchSize;
this.counter = 0;
this.batchCounter = 0;
this.processor = new PostProcessListProperty();
this.nonPrimitiveCategoryKeys = ElementProcessors.getNonPrimitiveCategoryKeys();
}
@Override
......@@ -59,22 +65,43 @@ public class PostProcessManager {
counter++;
try {
Vertex v = bulkLoadGraph.traversal().V(vertexId).next();
Vertex vertex = bulkLoadGraph.traversal().V(vertexId).next();
boolean isTypeVertex = vertex.property(TYPENAME_PROPERTY_KEY).isPresent();
VertexProperty typeNameProperty = vertex.property(ENTITY_TYPE_PROPERTY_KEY);
if (!isTypeVertex && typeNameProperty.isPresent()) {
String typeName = (String) typeNameProperty.value();
if (!typePropertiesMap.containsKey(typeName)) {
return;
}
Map<String, List<String>> collectionTypeProperties = typePropertiesMap.get(typeName);
for (String key : nonPrimitiveCategoryKeys) {
if (!collectionTypeProperties.containsKey(key)) {
continue;
}
for (String p : properties) {
utility.replaceReferencedEdgeIdForList(bulkLoadGraph, cache, v, p);
for(String propertyName : collectionTypeProperties.get(key)) {
processor.process(vertex, typeName, propertyName);
}
}
}
commitBatch();
} catch (Exception ex) {
LOG.error("processItem: v[{}] error!", vertexId, ex);
}
}
private void commitBatch() {
if (batchCounter >= batchSize) {
LOG.info("[{}]: batch: {}: commit", counter, batchCounter);
commit();
batchCounter = 0;
}
}
catch (Exception ex) {
LOG.error("processItem: v[{}] error!", vertexId, ex);
}
}
@Override
protected void doCommit() {
......@@ -84,22 +111,18 @@ public class PostProcessManager {
private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Object> {
private final Graph bulkLoadGraph;
private final GraphSONUtility utility;
private final int batchSize;
private final MappedElementCache cache;
private final String[] vertexPropertiesToPostProcess;
private final Map<String, Map<String, List<String>>> vertexPropertiesToPostProcess;
public ConsumerBuilder(Graph bulkLoadGraph, GraphSONUtility utility, String[] propertiesToPostProcess, int batchSize) {
public ConsumerBuilder(Graph bulkLoadGraph, Map<String, Map<String, List<String>>> propertiesToPostProcess, int batchSize) {
this.bulkLoadGraph = bulkLoadGraph;
this.utility = utility;
this.batchSize = batchSize;
this.cache = new MappedElementCache();
this.vertexPropertiesToPostProcess = propertiesToPostProcess;
}
@Override
public Consumer build(BlockingQueue<Object> queue) {
return new Consumer(queue, bulkLoadGraph, utility, vertexPropertiesToPostProcess, cache, batchSize);
return new Consumer(queue, bulkLoadGraph, vertexPropertiesToPostProcess, batchSize);
}
}
......@@ -109,8 +132,9 @@ public class PostProcessManager {
}
}
public static WorkItemsManager create(Graph bGraph, GraphSONUtility utility, String[] propertiesToPostProcess, int batchSize, int numWorkers) {
ConsumerBuilder cb = new ConsumerBuilder(bGraph, utility, propertiesToPostProcess, batchSize);
public static WorkItemsManager create(Graph bGraph, Map<String, Map<String, List<String>>> propertiesToPostProcess,
int batchSize, int numWorkers) {
ConsumerBuilder cb = new ConsumerBuilder(bGraph, propertiesToPostProcess, batchSize);
return new WorkItemsManager(cb, batchSize, numWorkers);
}
......
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -16,58 +16,83 @@
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.apache.commons.lang.StringUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class RelationshipCacheGenerator {
public static Map<String, String> get(AtlasTypeRegistry typeRegistry) {
Map<String, String> ret = new HashMap<>();
Collection<AtlasRelationshipType> relationshipTypes = typeRegistry.getAllRelationshipTypes();
public static class TypeInfo extends TypesUtil.Pair<String, PropagateTags> {
for (AtlasRelationshipType rt : relationshipTypes) {
AtlasRelationshipDef rd = rt.getRelationshipDef();
String relTypeName = rt.getTypeName();
public TypeInfo(String typeName, PropagateTags propagateTags) {
super(typeName, propagateTags);
}
public String getTypeName() {
return left;
}
public PropagateTags getPropagateTags() {
return right;
}
}
add(ret, getKey(rd.getEndDef1(), rt.getEnd1Type()), relTypeName);
add(ret, getKey(rd.getEndDef2(), rt.getEnd2Type()), relTypeName);
public static Map<String, TypeInfo> get(AtlasTypeRegistry typeRegistry) {
Map<String, TypeInfo> ret = new HashMap<>();
for (AtlasRelationshipType relType : typeRegistry.getAllRelationshipTypes()) {
AtlasRelationshipDef relDef = relType.getRelationshipDef();
String relTypeName = relType.getTypeName();
add(ret, getKey(relDef.getEndDef1()), relTypeName, relDef.getPropagateTags());
add(ret, getKey(relDef.getEndDef2()), relTypeName, getEnd2PropagateTag(relDef.getPropagateTags()));
}
return ret;
}
private static String getKey(AtlasRelationshipEndDef ed, AtlasEntityType rt) {
return getKey(ed.getIsLegacyAttribute(), rt.getTypeName(), ed.getName());
private static String getKey(AtlasRelationshipEndDef endDef) {
return getKey(endDef.getIsLegacyAttribute(), endDef.getType(), endDef.getName());
}
private static String getKey(String lhs, String rhs) {
return String.format("%s%s.%s", Constants.INTERNAL_PROPERTY_KEY_PREFIX, lhs, rhs);
}
private static String getKey(boolean isLegacy, String typeName, String name) {
if(!isLegacy) {
private static String getKey(boolean isLegacy, String entityTypeName, String relEndName) {
if (!isLegacy) {
return "";
}
return getKey(typeName, name);
return getKey(entityTypeName, relEndName);
}
private static void add(Map<String, String> map, String key, String value) {
if(StringUtils.isEmpty(key) || map.containsKey(key)) {
private static void add(Map<String, TypeInfo> map, String key, String relationTypeName, PropagateTags propagateTags) {
if (StringUtils.isEmpty(key) || map.containsKey(key)) {
return;
}
map.put(key, value);
map.put(key, new TypeInfo(relationTypeName, propagateTags));
}
private static PropagateTags getEnd2PropagateTag(PropagateTags end1PropagateTags) {
if (end1PropagateTags == PropagateTags.ONE_TO_TWO) {
return PropagateTags.TWO_TO_ONE;
} else if (end1PropagateTags == PropagateTags.TWO_TO_ONE) {
return PropagateTags.ONE_TO_TWO;
} else {
return end1PropagateTags;
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class RelationshipTypeCache {
private static final Logger LOG = LoggerFactory.getLogger(RelationshipTypeCache.class);
private final Map<String, String> relationshipLookup;
public RelationshipTypeCache(Map<String, String> lookup) {
relationshipLookup = lookup;
}
public String get(String label) {
return relationshipLookup.get(label);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.model.TypeCategory.*;
public class TypesWithCollectionsFinder {
private static final Logger LOG = LoggerFactory.getLogger(TypesWithCollectionsFinder.class);
static final EnumSet<TypeCategory> nonPrimitives = EnumSet.of(ENTITY, STRUCT, OBJECT_ID_TYPE);
public static Map<String, Map<String, List<String>>> getVertexPropertiesForCollectionAttributes(AtlasTypeRegistry typeRegistry) {
Map<String, Map<String, List<String>>> ret = new HashMap<>();
addVertexPropertiesForCollectionAttributes(typeRegistry.getAllEntityTypes(), ret);
addVertexPropertiesForCollectionAttributes(typeRegistry.getAllStructTypes(), ret);
displayInfo("types with properties: ", ret);
return ret;
}
private static void addVertexPropertiesForCollectionAttributes(Collection<? extends AtlasStructType> types, Map<String, Map<String, List<String>>> typeAttrMap) {
for (AtlasStructType type : types) {
Map<String, List<String>> collectionProperties = getVertexPropertiesForCollectionAttributes(type);
if(collectionProperties != null && collectionProperties.size() > 0) {
typeAttrMap.put(type.getTypeName(), collectionProperties);
}
}
}
static Map<String, List<String>> getVertexPropertiesForCollectionAttributes(AtlasStructType type) {
try {
Map<String, List<String>> collectionProperties = new HashMap<>();
for (AtlasAttribute attr : type.getAllAttributes().values()) {
addIfCollectionAttribute(attr, collectionProperties);
}
return collectionProperties;
} catch (Exception e) {
LOG.error("addVertexPropertiesForCollectionAttributes", e);
}
return null;
}
private static void addIfCollectionAttribute(AtlasAttribute attr, Map<String, List<String>> collectionProperties) {
AtlasType attrType = attr.getAttributeType();
TypeCategory attrTypeCategory = attrType.getTypeCategory();
switch (attrTypeCategory) {
case ARRAY: {
TypeCategory arrayElementType = ((AtlasArrayType) attrType).getElementType().getTypeCategory();
if (nonPrimitives.contains(arrayElementType)) {
addVertexProperty(attrTypeCategory.toString(), attr.getVertexPropertyName(), collectionProperties);
}
}
break;
case MAP: {
TypeCategory mapValueType = ((AtlasMapType) attrType).getValueType().getTypeCategory();
if (nonPrimitives.contains(mapValueType)) {
addVertexProperty(attrTypeCategory.toString(), attr.getVertexPropertyName(), collectionProperties);
} else {
addVertexProperty(attrTypeCategory.toString() + "_PRIMITIVE", attr.getVertexPropertyName(), collectionProperties);
}
}
break;
}
}
private static void addVertexProperty(String collectionType, String propertyName, Map<String, List<String>> collectionProperties) {
if(!collectionProperties.containsKey(collectionType)) {
collectionProperties.put(collectionType, new ArrayList<>());
}
collectionProperties.get(collectionType).add(propertyName);
}
static void displayInfo(String message, Map<String, Map<String, List<String>>> map) {
LOG.info(message);
for (Map.Entry<String, Map<String, List<String>>> e : map.entrySet()) {
LOG.info(" type: {} : {}", e.getKey(), e.getValue());
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration.postProcess;
import org.apache.atlas.repository.Constants;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PostProcessListProperty {
private static final Logger LOG = LoggerFactory.getLogger(PostProcessListProperty.class);
public void process(Vertex v, String typeName, String propertyName) {
try {
if (doesNotHaveProperty(v, typeName) || !hasProperty(v, propertyName)) {
return;
}
removeProperty(v, propertyName);
} catch (IllegalArgumentException ex) {
LOG.error("process: IllegalArgumentException: v[{}] error!", v.id(), ex);
}
}
protected void removeProperty(Vertex v, String propertyName) {
v.property(propertyName).remove();
}
protected boolean doesNotHaveProperty(Vertex v, String typeName) {
return v.property(Constants.TYPENAME_PROPERTY_KEY).isPresent() || !isInstanceVertexOfType(v, typeName);
}
private boolean hasProperty(Vertex v, String propertyName) {
try {
return v.property(propertyName).isPresent();
} catch(Exception ex) {
// ...
}
return false;
}
private boolean isInstanceVertexOfType(Vertex v, String typeName) {
if(v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
String s = (String) v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
return s.equals(typeName);
}
return false;
}
}
......@@ -26,14 +26,16 @@ import org.apache.tinkerpop.gremlin.structure.io.graphson.TypeInfo;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
import org.testng.ITestContext;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.testng.AssertJUnit.assertTrue;
......@@ -41,12 +43,16 @@ public class BaseUtils {
private static final String resourcesDirRelativePath = "/src/test/resources/";
private String resourceDir;
protected final RelationshipTypeCache emptyRelationshipCache = new RelationshipTypeCache(new HashMap<>());
protected final ElementProcessors emptyRelationshipCache = new ElementProcessors(new HashMap<>(), new HashMap<>());
protected GraphSONUtility graphSONUtility;
protected Object[][] getJsonNodeFromFile(String s) throws IOException {
protected JsonNode getJsonNodeFromFile(String s) {
File f = new File(getFilePath(s));
return new Object[][]{{getEntityNode(FileUtils.readFileToString(f))}};
try {
return getEntityNode(FileUtils.readFileToString(f));
} catch (IOException e) {
throw new SkipException("getJsonNodeFromFile: " + s, e);
}
}
protected String getFilePath(String fileName) {
......@@ -76,12 +82,18 @@ public class BaseUtils {
utility.vertexFromJson(tg, node);
}
protected void addEdge(TinkerGraph tg, MappedElementCache cache) throws IOException {
protected void addEdge(TinkerGraph tg, MappedElementCache cache) {
GraphSONUtility gu = graphSONUtility;
addVertexToGraph(tg, gu, getDBV(), getTableV(), getCol1(), getCol2());
addEdgeToGraph(tg, gu, cache, getEdge(), getEdgeCol(), getEdgeCol2());
}
protected void addEdgesForMap(TinkerGraph tg, MappedElementCache cache) {
GraphSONUtility gu = graphSONUtility;
gu.vertexFromJson(tg, (JsonNode) (getDBV(null)[0][0]));
gu.vertexFromJson(tg, (JsonNode) (getTableV(null))[0][0]);
gu.edgeFromJson(tg, cache, (JsonNode) getEdge(null)[0][0]);
addVertexToGraph(tg, gu, getDBV(), getTableV(), getCol1(), getCol2());
addEdgeToGraph(tg, gu, cache, getEdgeCol3(), getEdgeCol4());
}
protected Vertex fetchTableVertex(TinkerGraph tg) {
......@@ -91,29 +103,84 @@ public class BaseUtils {
return (Vertex) query.next();
}
@DataProvider(name = "col1")
public Object[][] getCol1(ITestContext context) throws IOException {
protected Map<String, Map<String, List<String>>> getTypePropertyMap(String type, String property, String category) {
Map<String, Map<String, List<String>>> map = new HashMap<>();
map.put(type, new HashMap<>());
map.get(type).put(category, new ArrayList<>());
map.get(type).get(category).add(property);
return map;
}
protected void addVertexToGraph(TinkerGraph tg, GraphSONUtility gu, JsonNode... nodes) {
for(JsonNode n : nodes) {
gu.vertexFromJson(tg, n);
}
}
protected void addEdgeToGraph(TinkerGraph tg, GraphSONUtility gu, MappedElementCache cache, JsonNode... nodes) {
for(JsonNode n : nodes) {
gu.edgeFromJson(tg, cache, n);
}
}
public JsonNode getCol1() {
return getJsonNodeFromFile("col-legacy.json");
}
@DataProvider(name = "dbType")
public Object[][] getDbType(ITestContext context) throws IOException {
public JsonNode getCol2() {
return getJsonNodeFromFile("col-2-legacy.json");
}
public JsonNode getCol3() {
return getJsonNodeFromFile("col-3-legacy.json");
}
public JsonNode getDbType() {
return getJsonNodeFromFile("db-type-legacy.json");
}
@DataProvider(name = "edge")
public Object[][] getEdge(ITestContext context) throws IOException {
public JsonNode getEdge() {
return getJsonNodeFromFile("edge-legacy.json");
}
@DataProvider(name = "dbV")
public Object[][] getDBV(ITestContext context) throws IOException {
return getJsonNodeFromFile("db-v-65544.json");
public JsonNode getEdgeCol() {
return getJsonNodeFromFile("edge-legacy-col.json");
}
public JsonNode getEdgeCol2() {
return getJsonNodeFromFile("edge-legacy-col2.json");
}
@DataProvider(name = "tableV")
public Object[][] getTableV(ITestContext context) throws IOException {
public JsonNode getEdgeCol3() {
return getJsonNodeFromFile("edge-legacy-col3.json");
}
public JsonNode getEdgeCol4() {
return getJsonNodeFromFile("edge-legacy-col4.json");
}
public JsonNode getEdgeTag() {
return getJsonNodeFromFile("edge-legacy-tag.json");
}
public JsonNode getDBV() {
return getJsonNodeFromFile("db-v-65544.json");
}
public JsonNode getTableV() {
return getJsonNodeFromFile("table-v-147504.json");
}
public JsonNode getTagV() {
return getJsonNodeFromFile("tag-163856752.json");
}
public JsonNode getProcessV() {
return getJsonNodeFromFile("lineage-v-98312.json");
}
public JsonNode getEdgeProcess() {
return getJsonNodeFromFile("edge-legacy-process.json");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
public class GraphSONUtilityPostProcessTest extends BaseUtils {
final String HIVE_COLUMNS_PROPERTY = "hive_table.columns";
final String edgeId1 = "816u-35tc-ao0l-47so";
final String edgeId2 = "82rq-35tc-ao0l-2glc";
final String edgeId1x = "816u-35tc-ao0l-xxxx";
final String edgeId2x = "82rq-35tc-ao0l-xxxx";
private TinkerGraph tg;
private MappedElementCache cache = new MappedElementCache();
private Vertex tableV;
@Test
public void noRefNoUpdate() throws IOException {
tg = TinkerGraph.open();
graphSONUtility = new GraphSONUtility(emptyRelationshipCache);
addEdge(tg, cache);
tableV = fetchTableVertex(tg);
assertNotNull(tableV);
assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV);
graphSONUtility.replaceReferencedEdgeIdForList(tg, cache, tableV, HIVE_COLUMNS_PROPERTY);
assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV);
}
@Test(dependsOnMethods = "noRefNoUpdate")
public void refFoundVertexUpdated() throws IOException {
cache.lruEdgeCache.put(edgeId1, edgeId1x);
cache.lruEdgeCache.put(edgeId2, edgeId2x);
graphSONUtility.replaceReferencedEdgeIdForList(tg, cache, tableV, HIVE_COLUMNS_PROPERTY);
assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1x, edgeId2x, tableV);
}
@Test(dependsOnMethods = "refFoundVertexUpdated")
public void updateUsingPostProcessConsumer() throws IOException {
MappedElementCache cache = new MappedElementCache();
BlockingQueue<Object> bc = new BlockingArrayQueue<>();
PostProcessManager.Consumer consumer = new PostProcessManager.Consumer(bc, tg, graphSONUtility,
new String[] {HIVE_COLUMNS_PROPERTY}, cache, 5);
cache.lruEdgeCache.put(edgeId1x, edgeId1);
cache.lruEdgeCache.put(edgeId2x, edgeId2);
consumer.processItem(tableV.id());
assertListProperty(HIVE_COLUMNS_PROPERTY, edgeId1, edgeId2, tableV);
}
private void assertListProperty(String HIVE_COLUMNS_PROPERTY, String edgeId1, String edgeId2, Vertex tableV) {
assertTrue(tableV.property(HIVE_COLUMNS_PROPERTY).isPresent());
List list = (List) tableV.property(HIVE_COLUMNS_PROPERTY).value();
assertEquals(list.size(), 2);
assertEquals(list.get(0), edgeId1);
assertEquals(list.get(1), edgeId2);
}
}
......@@ -36,8 +36,9 @@ import static org.testng.AssertJUnit.assertNotNull;
public class JsonNodeParsersTest extends BaseUtils {
@Test(dataProvider = "col1")
public void parseVertex(JsonNode nd) {
@Test
public void parseVertex() {
JsonNode nd = getCol1();
final int COL1_ORIGINAL_ID = 98336;
Object nodeId = getId(nd);
......@@ -68,8 +69,9 @@ public class JsonNodeParsersTest extends BaseUtils {
assertProperties(vUsingPe);
}
@Test(dataProvider = "edge")
public void parseEdge(JsonNode nd) throws IOException {
@Test
public void parseEdge() {
JsonNode nd = getEdge();
final String EDGE_ORIGINAL_ID = "8k5i-35tc-acyd-1eko";
Object nodeId = getId(nd);
......@@ -78,12 +80,12 @@ public class JsonNodeParsersTest extends BaseUtils {
JsonNodeParsers.ParseElement peVertex = new JsonNodeParsers.ParseVertex();
peVertex.setContext(graphSONUtility);
peVertex.parse(tg, cache, (JsonNode) (getDBV(null)[0][0]));
peVertex.parse(tg, cache, (JsonNode) (getTableV(null)[0][0]));
peVertex.parse(tg, cache, getDBV());
peVertex.parse(tg, cache, getTableV());
JsonNodeParsers.ParseElement pe = new JsonNodeParsers.ParseEdge();
pe.setContext(graphSONUtility);
pe.parse(tg, cache, (JsonNode) getEdge(null)[0][0]);
pe.parse(tg, cache, getEdge());
updateParseElement(tg, pe, nodeId);
......
......@@ -31,8 +31,9 @@ import static org.testng.Assert.*;
public class MappedElementCacheTest extends BaseUtils {
@Test(dataProvider = "col1")
public void vertexFetch(JsonNode node) {
@Test
public void vertexFetch() {
JsonNode node = getCol1();
MappedElementCache cache = new MappedElementCache();
TinkerGraph tg = TinkerGraph.open();
......@@ -41,32 +42,15 @@ public class MappedElementCacheTest extends BaseUtils {
Vertex vx = cache.getMappedVertex(tg, 98336);
assertNotNull(vx);
assertEquals(cache.lruVertexCache.size(), 1);
assertEquals(cache.lruEdgeCache.size(), 0);
}
@Test
public void edgeFetch() throws IOException {
public void edgeFetch() {
MappedElementCache cache = new MappedElementCache();
TinkerGraph tg = TinkerGraph.open();
addEdge(tg, cache);
assertEquals(cache.lruVertexCache.size(), 2);
assertEquals(cache.lruEdgeCache.size(), 0);
}
@Test
public void nonExistentVertexReturnsNull() {
TinkerGraph tg = TinkerGraph.open();
MappedElementCache cache = new MappedElementCache();
assertNull(cache.fetchVertex(tg, 1111));
assertNull(cache.fetchEdge(tg, "abcd"));
}
@DataProvider(name = "col1")
public Object[][] getCol1(ITestContext context) throws IOException {
return getJsonNodeFromFile("col-legacy.json");
assertEquals(cache.lruVertexCache.size(), 4);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.janus.migration.postProcess.PostProcessListProperty;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class PostProcessListPropertyTest extends BaseUtils {
final String HIVE_TABLE_TYPE = "hive_table";
final String HIVE_COLUMNS_PROPERTY = "hive_table.columns";
final String col1EdgeId = "816u-35tc-ao0l-47so";
final String col2EdgeId = "82rq-35tc-ao0l-2glc";
@Test
public void noRefNoUpdate() throws IOException {
TestSetup ts = new TestSetup();
ts.getPostProcessListProperty().process(ts.getTable(), HIVE_TABLE_TYPE, HIVE_COLUMNS_PROPERTY);
ts.assertIncomplete();
}
@Test
public void refFoundVertexUpdated() throws IOException {
TestSetup ts = new TestSetup();
assertNotNull(ts.getTable());
ts.getPostProcessListProperty().process(ts.getTable(), HIVE_TABLE_TYPE, HIVE_COLUMNS_PROPERTY);
ts.assertComplete();
}
@Test
public void updateUsingPostProcessConsumer() throws IOException {
TestSetup ts = new TestSetup();
BlockingQueue<Object> bc = new BlockingArrayQueue<>();
PostProcessManager.Consumer consumer = new PostProcessManager.Consumer(bc, ts.getGraph(),
getTypePropertyMap("hive_table", HIVE_COLUMNS_PROPERTY, "ARRAY"), 5);
Vertex tableV = fetchTableVertex(ts.getGraph());
consumer.processItem(tableV.id());
ts.assertComplete();
}
private class TestSetup {
private PostProcessListProperty postProcessListProperty;
private TinkerGraph tg;
private MappedElementCache cache;
private Vertex tableV;
public PostProcessListProperty getPostProcessListProperty() {
return postProcessListProperty;
}
public TinkerGraph getGraph() {
return tg;
}
public MappedElementCache getCache() {
return cache;
}
public Vertex getTable() {
return tableV;
}
public TestSetup() throws IOException {
postProcessListProperty = new PostProcessListProperty();
tg = TinkerGraph.open();
cache = new MappedElementCache();
addEdge(tg, cache);
tableV = fetchTableVertex(tg);
assertSetup();
}
public void assertSetup() {
assertTrue(tableV.property(HIVE_COLUMNS_PROPERTY).isPresent());
List list = (List) tableV.property(HIVE_COLUMNS_PROPERTY).value();
assertEquals(list.size(), 2);
assertEquals(list.get(0), col1EdgeId);
assertEquals(list.get(1), col2EdgeId);
}
private void assertIncomplete() {
assertPropertyRemoved(HIVE_COLUMNS_PROPERTY, tableV);
Iterator<Edge> edges = tableV.edges(Direction.OUT, getEdgeLabel(HIVE_COLUMNS_PROPERTY));
while (edges.hasNext()) {
Edge e = edges.next();
assertFalse(e.property(ATTRIBUTE_INDEX_PROPERTY_KEY).isPresent());
}
}
private void assertComplete() {
assertPropertyRemoved(HIVE_COLUMNS_PROPERTY, tableV);
}
private void assertPropertyRemoved(String property, Vertex tableV) {
assertFalse(tableV.property(property).isPresent());
}
public String getEdgeLabel(String property ) {
return Constants.INTERNAL_PROPERTY_KEY_PREFIX + property;
}
}
}
{
"Asset.name": {
"type": "string",
"value": "col2"
},
"hive_column.type": {
"type": "string",
"value": "string"
},
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"entityText": {
"type": "string",
"value": "hive_column owner anonymous qualifiedName stocks.test_table_view.col2@cl1 name col2 position 1 type string table "
},
"Referenceable.qualifiedName": {
"type": "string",
"value": "stocks.test_table_view.col2@cl1"
},
"__guid": {
"type": "string",
"value": "9cef2494-766c-4671-96a8-828dce677e7e"
},
"__version": {
"type": "integer",
"value": 0
},
"__superTypeNames": {
"type": "list",
"value": [{
"type": "string",
"value": "Asset"
}, {
"type": "string",
"value": "DataSet"
}, {
"type": "string",
"value": "Referenceable"
}]
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__typeName": {
"type": "string",
"value": "hive_column"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693838471
},
"Asset.owner": {
"type": "string",
"value": "anonymous"
},
"hive_column.position": {
"type": "integer",
"value": 1
},
"__timestamp": {
"type": "long",
"value": 1522693835017
},
"_id": 114816,
"_type": "vertex"
}
{
"Asset.name": {
"type": "string",
"value": "open"
},
"hive_column.type": {
"type": "string",
"value": "string"
},
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"entityText": {
"type": "string",
"value": "hive_column owner anonymous qualifiedName stocks.stocks_daily.open@cl1 name open position 1 type string table Tag1 "
},
"Referenceable.qualifiedName": {
"type": "string",
"value": "stocks.stocks_daily.open@cl1"
},
"__traitNames": {
"type": "list",
"value": [
{
"type": "string",
"value": "Tag1"
}
]
},
"__guid": {
"type": "string",
"value": "8231a95f-d062-4685-81aa-0b62401bc796"
},
"__version": {
"type": "integer",
"value": 0
},
"__superTypeNames": {
"type": "list",
"value": [
{
"type": "string",
"value": "Asset"
},
{
"type": "string",
"value": "DataSet"
},
{
"type": "string",
"value": "Referenceable"
}
]
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__typeName": {
"type": "string",
"value": "hive_column"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693815850
},
"Asset.owner": {
"type": "string",
"value": "anonymous"
},
"hive_column.position": {
"type": "integer",
"value": 1
},
"__timestamp": {
"type": "long",
"value": 1522693815850
},
"_id": 16752,
"_type": "vertex"
}
{
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693835017
},
"__timestamp": {
"type": "long",
"value": 1522693835017
},
"_id": "816u-35tc-ao0l-47so",
"_type": "edge",
"_outV": 147504,
"_inV": 98336,
"_label": "__hive_table.columns"
}
\ No newline at end of file
{
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693835017
},
"__timestamp": {
"type": "long",
"value": 1522693835017
},
"_id": "82rq-35tc-ao0l-2glc",
"_type": "edge",
"_outV": 147504,
"_inV": 114816,
"_label": "__hive_table.columns"
}
{
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693835017
},
"__timestamp": {
"type": "long",
"value": 1522693835017
},
"_id": "816u-35tc-ao0l-47aa",
"_type": "edge",
"_outV": 147504,
"_inV": 98336,
"_label": "__hive_table.columnsMap.col3"
}
{
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693835017
},
"__timestamp": {
"type": "long",
"value": 1522693835017
},
"_id": "82rq-35tc-ao0l-2gaa",
"_type": "edge",
"_outV": 147504,
"_inV": 114816,
"_label": "__hive_table.columnsMap.col4"
}
{
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693837285
},
"__timestamp": {
"type": "long",
"value": 1522693837285
},
"_id": "6jgh-23uw-2uqd-1elc",
"_type": "edge",
"_outV": 98312,
"_inV": 147504,
"_label": "__Process.inputs"
}
{
"__modifiedBy": {
"type": "string",
"value": "admin"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"__createdBy": {
"type": "string",
"value": "admin"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522694252176
},
"__timestamp": {
"type": "long",
"value": 1522694252176
},
"_id": "2pk3la-cxc-m61h-2pk0og",
"_type": "edge",
"_outV": 16752,
"_inV": 163856752,
"_label": "Tag1"
}
\ No newline at end of file
{
"hive_column_lineage.depenendencyType": {
"type": "string",
"value": "SIMPLE"
},
"Asset.name": {
"type": "string",
"value": "create view test_table_view partitioned on (col4) as select col1, col2, col4 from test_table:col1"
},
"Process.inputs": {
"type": "list",
"value": [{
"type": "string",
"value": "6jgh-23uw-2uqd-1elc"
}]
},
"Process.outputs": {
"type": "list",
"value": [{
"type": "string",
"value": "6ept-23uw-2wb9-47so"
}]
},
"__modifiedBy": {
"type": "string",
"value": "anonymous"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"entityText": {
"type": "string",
"value": "hive_column_lineage outputs qualifiedName stocks.test_table_view@cl1:1522693834000:col1 inputs query name create view test_table_view partitioned on (col4) as select col1, col2, col4 from test_table:col1 depenendencyType SIMPLE "
},
"Referenceable.qualifiedName": {
"type": "string",
"value": "stocks.test_table_view@cl1:1522693834000:col1"
},
"__guid": {
"type": "string",
"value": "9336b7a4-9cc0-4ef0-8dc1-01fce2def6a5"
},
"__version": {
"type": "integer",
"value": 0
},
"__superTypeNames": {
"type": "list",
"value": [{
"type": "string",
"value": "Asset"
}, {
"type": "string",
"value": "Process"
}, {
"type": "string",
"value": "Referenceable"
}]
},
"__createdBy": {
"type": "string",
"value": "anonymous"
},
"__typeName": {
"type": "string",
"value": "hive_column_lineage"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522693837285
},
"__timestamp": {
"type": "long",
"value": 1522693837285
},
"_id": 98312,
"_type": "vertex"
}
......@@ -33,10 +33,12 @@
},
"hive_table.parameters": {
"type": "list",
"value": [{
"value": [
{
"type": "string",
"value": "transient_lastDdlTime"
}]
}
]
},
"hive_table.retention": {
"type": "integer",
......@@ -44,10 +46,12 @@
},
"hive_table.partitionKeys": {
"type": "list",
"value": [{
"value": [
{
"type": "string",
"value": "8dty-35tc-amfp-23xs"
}]
}
]
},
"__guid": {
"type": "string",
......@@ -63,16 +67,20 @@
},
"__superTypeNames": {
"type": "list",
"value": [{
"value": [
{
"type": "string",
"value": "Asset"
}, {
},
{
"type": "string",
"value": "DataSet"
}, {
},
{
"type": "string",
"value": "Referenceable"
}]
}
]
},
"hive_table.viewExpandedText": {
"type": "string",
......@@ -104,13 +112,37 @@
},
"hive_table.columns": {
"type": "list",
"value": [{
"value": [
{
"type": "string",
"value": "816u-35tc-ao0l-47so"
}, {
},
{
"type": "string",
"value": "82rq-35tc-ao0l-2glc"
}]
}
]
},
"hive_table.columnsMap.col3": {
"type": "string",
"value": "816u-35tc-ao0l-47aa"
},
"hive_table.columnsMap.col4": {
"type": "string",
"value": "82rq-35tc-ao0l-2gaa"
},
"hive_table.columnsMap": {
"type": "list",
"value": [
{
"type": "string",
"value": "col3"
},
{
"type": "string",
"value": "col4"
}
]
},
"__timestamp": {
"type": "long",
......@@ -119,3 +151,4 @@
"_id": 147504,
"_type": "vertex"
}
{
"__superTypeNames": {
"type": "list",
"value": [{
"type": "string",
"value": "[]"
}]
},
"__modifiedBy": {
"type": "string",
"value": "admin"
},
"__state": {
"type": "string",
"value": "ACTIVE"
},
"__createdBy": {
"type": "string",
"value": "admin"
},
"__typeName": {
"type": "string",
"value": "Tag1"
},
"__modificationTimestamp": {
"type": "long",
"value": 1522694252176
},
"__timestamp": {
"type": "long",
"value": 1522694252176
},
"_id": 163856752,
"_type": "vertex"
}
......@@ -21,16 +21,12 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef.AtlasClassificationDefs;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
/**
* Interface to persistence store of TypeDef
......@@ -107,6 +103,4 @@ public interface AtlasTypeDefStore {
AtlasBaseTypeDef getByName(String name) throws AtlasBaseException;
AtlasBaseTypeDef getByGuid(String guid) throws AtlasBaseException;
void loadLegacyData(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException;
}
......@@ -18,7 +18,6 @@
package org.apache.atlas;
import com.sun.org.apache.xpath.internal.operations.Bool;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
......
......@@ -18,11 +18,11 @@
package org.apache.atlas.repository.migration;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.impexp.ImportTypeDefProcessor;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.store.AtlasTypeDefStore;
......@@ -56,11 +56,11 @@ public class DataMigrationService implements Service {
private final Thread thread;
@Inject
public DataMigrationService(AtlasTypeDefStore typeDefStore, Configuration configuration,
public DataMigrationService(AtlasGraph graph, AtlasTypeDefStore typeDefStore, Configuration configuration,
GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer,
AtlasTypeRegistry typeRegistry) {
this.configuration = configuration;
this.thread = new Thread(new FileImporter(typeDefStore, typeRegistry, storeInitializer, getFileName(), indexer));
this.thread = new Thread(new FileImporter(graph, typeDefStore, typeRegistry, storeInitializer, getFileName(), indexer));
}
@Override
......@@ -83,15 +83,17 @@ public class DataMigrationService implements Service {
}
public static class FileImporter implements Runnable {
private final AtlasGraph graph;
private final AtlasTypeDefStore typeDefStore;
private final String importDirectory;
private final GraphBackedSearchIndexer indexer;
private final AtlasTypeRegistry typeRegistry;
private final AtlasTypeDefStoreInitializer storeInitializer;
public FileImporter(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
public FileImporter(AtlasGraph graph, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
AtlasTypeDefStoreInitializer storeInitializer,
String directoryName, GraphBackedSearchIndexer indexer) {
this.graph = graph;
this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry;
this.storeInitializer = storeInitializer;
......@@ -99,7 +101,16 @@ public class DataMigrationService implements Service {
this.indexer = indexer;
}
public void performImport() throws AtlasBaseException {
@Override
public void run() {
try {
performImport();
} catch (AtlasBaseException e) {
LOG.error("Data Migration:", e);
}
}
private void performImport() throws AtlasBaseException {
try {
if(!performAccessChecks(importDirectory)) {
return;
......@@ -109,7 +120,7 @@ public class DataMigrationService implements Service {
FileInputStream fs = new FileInputStream(getFileFromImportDirectory(importDirectory, ATLAS_MIGRATION_DATA_NAME));
typeDefStore.loadLegacyData(RelationshipCacheGenerator.get(typeRegistry), fs);
graph.importLegacyGraphSON(typeRegistry, fs);
} catch (Exception ex) {
LOG.error("Import failed!", ex);
throw new AtlasBaseException(ex);
......@@ -117,11 +128,13 @@ public class DataMigrationService implements Service {
}
private boolean performAccessChecks(String path) {
boolean ret = false;
final boolean ret;
if(StringUtils.isEmpty(path)) {
ret = false;
} else {
File f = new File(path);
ret = f.exists() && f.isDirectory() && f.canRead();
}
......@@ -137,17 +150,19 @@ public class DataMigrationService implements Service {
private void performInit() throws AtlasBaseException, AtlasException {
indexer.instanceIsActive();
storeInitializer.instanceIsActive();
processIncomingTypesDef(getFileFromImportDirectory(importDirectory, ATLAS_MIGRATION_TYPESDEF_NAME));
}
@VisibleForTesting
void processIncomingTypesDef(File typesDefFile) throws AtlasBaseException {
private void processIncomingTypesDef(File typesDefFile) throws AtlasBaseException {
try {
AtlasImportResult result = new AtlasImportResult();
String jsonStr = FileUtils.readFileToString(typesDefFile);
AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class);
ImportTypeDefProcessor processor = new ImportTypeDefProcessor(typeDefStore, typeRegistry);
processor.processTypes(typesDef, result);
LOG.info(" types migrated: {}", result.getMetrics());
} catch (IOException e) {
LOG.error("processIncomingTypesDef: Could not process file: {}! Imported data may not be usable.", typesDefFile.getName());
......@@ -157,14 +172,5 @@ public class DataMigrationService implements Service {
private File getFileFromImportDirectory(String importDirectory, String fileName) {
return Paths.get(importDirectory, fileName).toFile();
}
@Override
public void run() {
try {
performImport();
} catch (AtlasBaseException e) {
LOG.error("Data Migration:", e);
}
}
}
}
......@@ -24,7 +24,6 @@ import static org.apache.atlas.repository.Constants.TYPE_CATEGORY_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.VERTEX_TYPE;
import java.io.InputStream;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
......@@ -49,7 +48,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Singleton;
......@@ -110,11 +108,6 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
LOG.info("<== AtlasTypeDefGraphStoreV1.init()");
}
@Override
public void loadLegacyData(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException {
getAtlasGraph().loadLegacyGraphSON(relationshipCache, fs);
}
AtlasGraph getAtlasGraph() { return atlasGraph; }
@VisibleForTesting
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ComplexAttributesTest extends MigrationBaseAsserts {
@Inject
public ComplexAttributesTest(AtlasGraph graph) {
super(graph);
}
@Test
public void verify() throws IOException, AtlasBaseException {
String STRUCT_TYPE = "struct_type";
String ENTITY_TYPE = "entity_type";
String ENTITY_WITH_COMPLEX_COLL_TYPE = "entity_with_complex_collection_attr";
final int EXPECTED_TOTAL_COUNT = 214;
final int EXPECTED_ENTITY_TYPE_COUNT = 16;
final int EXPECTED_STRUCT_TYPE_COUNT = 3;
final int EXPECTED_ENTITY_WITH_COMPLEX_COLL_TYPE_COUNT = 1;
runFileImporter("complex-attr_db");
assertTypeCountNameGuid(STRUCT_TYPE, EXPECTED_STRUCT_TYPE_COUNT,"", "");
assertTypeCountNameGuid(ENTITY_TYPE, EXPECTED_ENTITY_TYPE_COUNT, "", "");
assertTypeCountNameGuid(ENTITY_WITH_COMPLEX_COLL_TYPE, EXPECTED_ENTITY_WITH_COMPLEX_COLL_TYPE_COUNT, "", "");
assertEdgesWithLabel(getVertex(ENTITY_WITH_COMPLEX_COLL_TYPE, "").getEdges(AtlasEdgeDirection.OUT).iterator(),1, "__entity_with_complex_collection_attr.listOfEntities");
assertEdgesWithLabel(getVertex(ENTITY_WITH_COMPLEX_COLL_TYPE, "").getEdges(AtlasEdgeDirection.OUT).iterator(),9, "__entity_with_complex_collection_attr.mapOfStructs");
assertMigrationStatus(EXPECTED_TOTAL_COUNT);
}
}
......@@ -21,17 +21,13 @@ package org.apache.atlas.repository.migration;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.List;
import static org.testng.Assert.assertEquals;
@Guice(modules = TestModules.TestOnlyModule.class)
public class HiveParititionTest extends MigrationBaseAsserts {
......@@ -41,7 +37,7 @@ public class HiveParititionTest extends MigrationBaseAsserts {
super(graph);
}
@Test(enabled = false)
@Test
public void fileImporterTest() throws IOException, AtlasBaseException {
final int EXPECTED_TOTAL_COUNT = 141;
final int EXPECTED_DB_COUNT = 1;
......@@ -50,8 +46,6 @@ public class HiveParititionTest extends MigrationBaseAsserts {
runFileImporter("parts_db");
assertPartitionKeyProperty(getVertex("hive_table", "t1"), 1);
assertPartitionKeyProperty(getVertex("hive_table", "tv1"), 1);
assertHiveVertices(EXPECTED_DB_COUNT, EXPECTED_TABLE_COUNT, EXPECTED_COLUMN_COUNT);
assertTypeCountNameGuid("hive_db", 1, "parts_db", "ae30d78b-51b4-42ab-9436-8d60c8f68b95");
......@@ -62,9 +56,4 @@ public class HiveParititionTest extends MigrationBaseAsserts {
assertMigrationStatus(EXPECTED_TOTAL_COUNT);
}
private void assertPartitionKeyProperty(AtlasVertex vertex, int expectedCount) {
List<String> keys = GraphHelper.getListProperty(vertex, "hive_table.partitionKeys");
assertEquals(keys.size(), expectedCount);
}
}
......@@ -35,7 +35,7 @@ public class HiveStocksTest extends MigrationBaseAsserts {
super(graph);
}
@Test(enabled = false)
@Test
public void migrateStocks() throws AtlasBaseException, IOException {
final int EXPECTED_TOTAL_COUNT = 188;
final int EXPECTED_DB_COUNT = 1;
......@@ -47,7 +47,6 @@ public class HiveStocksTest extends MigrationBaseAsserts {
assertHiveVertices(EXPECTED_DB_COUNT, EXPECTED_TABLE_COUNT, EXPECTED_COLUMN_COUNT);
assertTypeCountNameGuid("hive_db", 1, "stocks", "4e13b36b-9c54-4616-9001-1058221165d0");
assertTypeCountNameGuid("hive_table", 1, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774");
assertTypeAttribute("hive_table", 7, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774", "hive_table.columns");
assertTypeCountNameGuid("hive_column", 1, "high", "d72ce4fb-6f17-4e68-aa85-967366c9e891");
assertTypeCountNameGuid("hive_column", 1, "open", "788ba8fe-b7d8-41ba-84ef-c929732924ec");
assertTypeCountNameGuid("hive_column", 1, "dt", "643a0a71-0d97-477d-a43b-7ca433f85160");
......
......@@ -34,7 +34,6 @@ import org.testng.annotations.AfterClass;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
......@@ -48,10 +47,10 @@ public class MigrationBaseAsserts {
private final String R_GUID_PROPERTY_NAME = "_r__guid";
@Inject
private AtlasTypeDefStore typeDefStore;
protected AtlasTypeDefStore typeDefStore;
@Inject
private AtlasTypeRegistry typeRegistry;
protected AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStoreInitializer storeInitializer;
......@@ -74,7 +73,7 @@ public class MigrationBaseAsserts {
}
}
private void loadTypesFromJson() throws IOException, AtlasBaseException {
protected void loadTypesFromJson() throws IOException, AtlasBaseException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1020-fs_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
......@@ -83,7 +82,7 @@ public class MigrationBaseAsserts {
protected void runFileImporter(String directoryToImport) throws IOException, AtlasBaseException {
loadTypesFromJson();
String directoryName = TestResourceFileUtils.getDirectory(directoryToImport);
DataMigrationService.FileImporter fi = new DataMigrationService.FileImporter(typeDefStore, typeRegistry,
DataMigrationService.FileImporter fi = new DataMigrationService.FileImporter(graph, typeDefStore, typeRegistry,
storeInitializer, directoryName, indexer);
fi.run();
......@@ -152,14 +151,19 @@ public class MigrationBaseAsserts {
assertEquals(count, expectedItems, String.format("%s", edgeTypeName));
}
protected void assertTypeAttribute(String typeName, int expectedSize, String name, String guid, String propertyName) {
AtlasVertex v = getVertex(typeName, name);
String guidActual = GraphHelper.getGuid(v);
List list = (List) GraphHelper.getProperty(v, propertyName);
protected void assertEdgesWithLabel(Iterator<AtlasEdge> results, int startIdx, String edgeTypeName) {
int count = 0;
AtlasEdge e = null;
for (Iterator<AtlasEdge> it = results; it.hasNext() && count < startIdx; count++) {
e = it.next();
}
assertEquals(guidActual, guid);
assertNotNull(list);
assertEquals(list.size(), expectedSize);
assertNotNull(GraphHelper.getProperty(e, R_GUID_PROPERTY_NAME));
assertNotNull(GraphHelper.getProperty(e, "tagPropagation"));
if(StringUtils.isNotEmpty(edgeTypeName)) {
assertEquals(e.getLabel(), edgeTypeName, edgeTypeName);
}
}
protected void assertTypeCountNameGuid(String typeName, int expectedItems, String name, String guid) {
......
......@@ -23,29 +23,27 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.impexp.ZipFileResourceTestUtils;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.graphdb.janus.migration.RelationshipCacheGenerator;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.jcodings.util.Hash;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class RelationshipMappingTest {
public class RelationshipCacheGeneratorTest {
@Inject
private AtlasTypeDefStore typeDefStore;
......@@ -61,12 +59,23 @@ public class RelationshipMappingTest {
@Test
public void createLookup() {
Map<String, String> cache = RelationshipCacheGenerator.get(typeRegistry);
final String PROCESS_INPUT_KEY = "__Process.inputs";
final String PROCESS_OUTPUT_KEY = "__Process.outputs";
String ONE_TO_TWO_STR = ONE_TO_TWO.toString();
String TWO_TO_ONE_STR = TWO_TO_ONE.toString();
Map<String, RelationshipCacheGenerator.TypeInfo> cache = RelationshipCacheGenerator.get(typeRegistry);
assertEquals(cache.size(), getLegacyAttributeCount() - 1);
for (Map.Entry<String, String> entry : cache.entrySet()) {
for (Map.Entry<String, RelationshipCacheGenerator.TypeInfo> entry : cache.entrySet()) {
assertTrue(StringUtils.isNotEmpty(entry.getKey()));
assertTrue(entry.getKey().startsWith(Constants.INTERNAL_PROPERTY_KEY_PREFIX), entry.getKey());
}
assertEquals(cache.get(PROCESS_INPUT_KEY).getTypeName(), "dataset_process_inputs");
assertEquals(cache.get(PROCESS_INPUT_KEY).getPropagateTags(), ONE_TO_TWO_STR);
assertEquals(cache.get(PROCESS_OUTPUT_KEY).getTypeName(), "process_dataset_outputs");
assertEquals(cache.get(PROCESS_OUTPUT_KEY).getPropagateTags(), TWO_TO_ONE_STR);
}
private int getLegacyAttributeCount() {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.janus.migration.TypesWithCollectionsFinder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class TypesWithCollectionsFinderTest extends MigrationBaseAsserts {
@Inject
protected TypesWithCollectionsFinderTest(AtlasGraph graph) {
super(graph);
}
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
loadTypesFromJson();
}
@Test
public void fetchAll() {
Map<String, Map<String, List<String>>> typeAttrMap = TypesWithCollectionsFinder.getVertexPropertiesForCollectionAttributes(typeRegistry);
assertEquals(typeAttrMap.size(), 9);
assertProperties(typeAttrMap, "__AtlasUserProfile", "ARRAY", "__AtlasUserProfile.savedSearches");
assertProperties(typeAttrMap, "Process", "ARRAY", "Process.inputs");
assertProperties(typeAttrMap, "Process", "ARRAY", "Process.outputs");
assertProperties(typeAttrMap, "hdfs_path", "MAP_PRIMITIVE", "hdfs_path.extendedAttributes");
assertProperties(typeAttrMap, "hive_column_lineage", "ARRAY", "Process.inputs");
assertProperties(typeAttrMap, "hive_column_lineage", "ARRAY", "Process.outputs");
assertProperties(typeAttrMap, "hive_db", "MAP_PRIMITIVE", "hive_db.parameters");
assertProperties(typeAttrMap, "hive_process", "ARRAY", "Process.inputs");
assertProperties(typeAttrMap, "hive_process", "ARRAY", "Process.outputs");
assertProperties(typeAttrMap, "hive_storagedesc", "ARRAY", "hive_storagedesc.sortCols");
assertProperties(typeAttrMap, "hive_serde", "MAP_PRIMITIVE", "hive_serde.parameters");
assertProperties(typeAttrMap, "hive_table", "ARRAY", "hive_table.partitionKeys");
assertProperties(typeAttrMap, "hive_table", "ARRAY", "hive_table.columns");
assertProperties(typeAttrMap, "hive_table", "MAP_PRIMITIVE", "hive_table.parameters");
}
private void assertProperties(Map<String, Map<String, List<String>>> typeAttrMap, String typeName, String typeCategory, String propertyName) {
List<String> actualProperties = typeAttrMap.get(typeName).get(typeCategory);
assertTrue(actualProperties.contains(propertyName));
}
}
......@@ -35,7 +35,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.runner.LocalSolrRunner;
......@@ -82,6 +82,9 @@ public class AtlasEntityTestBase {
@Inject
private EntityGraphMapper graphMapper;
@Inject
protected AtlasGraph graph;
AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
@BeforeClass
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment