Commit 76a20170 by Ashutosh Mestry Committed by Madhan Neethiraj

ATLAS-2717: updated migration-import to handle legacy types that reference classification

parent 5dfbd08b
......@@ -18,7 +18,6 @@
package org.apache.atlas.repository.graphdb;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
......@@ -28,9 +27,7 @@ import javax.script.ScriptException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.groovy.GroovyExpression;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
/**
* Represents a graph.
......@@ -320,8 +317,4 @@ public interface AtlasGraph<V, E> {
* @return
*/
boolean isMultiProperty(String name);
void importLegacyGraphSON(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException;
MigrationStatus getMigrationStatus();
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasTypeRegistry;
import java.io.InputStream;
public interface GraphDBMigrator {
AtlasTypesDef getScrubbedTypesDef(String jsonStr);
void importData(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException;
MigrationStatus getMigrationStatus();
}
......@@ -25,7 +25,6 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.groovy.GroovyExpression;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
......@@ -37,7 +36,6 @@ import org.apache.atlas.repository.graphdb.GremlinVersion;
import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery;
import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.groovy.CompilerCustomizerProvider;
import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider;
......@@ -65,7 +63,6 @@ import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashSet;
......@@ -346,16 +343,6 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
return multiProperties.contains(propertyName);
}
@Override
public void importLegacyGraphSON(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException {
AtlasJanusGraphDatabase.loadLegacyGraphSON(typeRegistry, fs);
}
@Override
public MigrationStatus getMigrationStatus() {
return AtlasJanusGraphDatabase.getMigrationStatus();
}
public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterable<? extends Vertex> it) {
return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input)).collect(Collectors.toList());
}
......
......@@ -20,18 +20,12 @@ package org.apache.atlas.repository.graphdb.janus;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDatabase;
import org.apache.atlas.repository.graphdb.janus.migration.AtlasGraphSONReader;
import org.apache.atlas.repository.graphdb.janus.migration.ReaderStatusManager;
import org.apache.atlas.repository.graphdb.janus.migration.ElementProcessors;
import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.configuration.Configuration;
......@@ -45,7 +39,6 @@ import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
......@@ -231,36 +224,4 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
return ret;
}
public static void loadLegacyGraphSON(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
LOG.info("Starting loadLegacyGraphSON...");
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "loadLegacyGraphSON");
}
AtlasGraphSONReader legacyGraphSONReader = AtlasGraphSONReader.build().
relationshipCache(new ElementProcessors(typeRegistry)).
schemaDB(getGraphInstance()).
bulkLoadingDB(getBulkLoadingGraphInstance()).
create();
legacyGraphSONReader.readGraph(fs);
} catch (Exception ex) {
LOG.error("Error loading loadLegacyGraphSON2", ex);
throw new AtlasBaseException(ex);
} finally {
AtlasPerfTracer.log(perf);
LOG.info("Done! loadLegacyGraphSON.");
}
}
public static MigrationStatus getMigrationStatus() {
return ReaderStatusManager.get(getGraphInstance());
}
}
......@@ -36,8 +36,10 @@ import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY
import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.graphdb.janus.migration.TypesDefScrubber.*;
public class ElementProcessors {
private static final Logger LOG = LoggerFactory.getLogger(ElementProcessors.class);
......@@ -47,22 +49,29 @@ public class ElementProcessors {
public static final String NON_PRIMITIVE_ARRAY_CATEGORY = "ARRAY";
private static final String[] NON_PRIMITIVE_KEYS = { ElementProcessors.NON_PRIMITIVE_ARRAY_CATEGORY };
private final Map<String, RelationshipCacheGenerator.TypeInfo> relationshipLookup;
private final Map<String, Map<String, List<String>>> postProcessMap;
private final Map<String, RelationshipCacheGenerator.TypeInfo> relationshipLookup;
private final Map<String, Map<String, List<String>>> postProcessMap;
private final Map<String, ClassificationToStructDefName> traitToTypeMap;
private final NonPrimitiveListPropertyProcessor nonPrimitiveListPropertyProcessor = new NonPrimitiveListPropertyProcessor();
private final NonPrimitiveMapPropertyProcessor nonPrimitiveMapPropertyProcessor = new NonPrimitiveMapPropertyProcessor();
private final PrimitiveMapPropertyProcessor primitiveMapPropertyProcessor = new PrimitiveMapPropertyProcessor();
private final EdgeCollectionPropertyProcessor edgeCollectionPropertyProcessor = new EdgeCollectionPropertyProcessor();
private final EdgeRelationshipPropertyProcessor edgeRelationshipPropertyProcessor = new EdgeRelationshipPropertyProcessor();
private final EdgeTraitTypesPropertyProcessor edgeTraitTypesPropertyProcessor = new EdgeTraitTypesPropertyProcessor();
public ElementProcessors(AtlasTypeRegistry typeRegistry) {
this(RelationshipCacheGenerator.get(typeRegistry), TypesWithCollectionsFinder.getVertexPropertiesForCollectionAttributes(typeRegistry));
public ElementProcessors(AtlasTypeRegistry typeRegistry, TypesDefScrubber scrubber) {
this(RelationshipCacheGenerator.get(typeRegistry),
TypesWithCollectionsFinder.getVertexPropertiesForCollectionAttributes(typeRegistry),
scrubber.getTraitToTypeMap());
}
ElementProcessors(Map<String, RelationshipCacheGenerator.TypeInfo> lookup, Map<String, Map<String, List<String>>> postProcessMap) {
ElementProcessors(Map<String, RelationshipCacheGenerator.TypeInfo> lookup,
Map<String, Map<String, List<String>>> postProcessMap,
Map<String, ClassificationToStructDefName> traitToTypeMap) {
this.relationshipLookup = lookup;
this.postProcessMap = postProcessMap;
this.traitToTypeMap = traitToTypeMap;
}
public static String[] getNonPrimitiveCategoryKeys() {
......@@ -240,8 +249,39 @@ public class ElementProcessors {
}
}
private class EdgeTraitTypesPropertyProcessor {
private void update(String label, Vertex in) {
if (traitToTypeMap.size() == 0) {
return;
}
if (!in.property(ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
return;
}
String typeName = (String) in.property(ENTITY_TYPE_PROPERTY_KEY).value();
String key = label;
if (!traitToTypeMap.containsKey(key)) {
key = StringUtils.substringBeforeLast(key, ".");
if(!traitToTypeMap.containsKey(key)) {
return;
}
}
if (!traitToTypeMap.get(key).getTypeName().equals(typeName)) {
return;
}
in.property(ENTITY_TYPE_PROPERTY_KEY, traitToTypeMap.get(key).getLegacyTypeName());
}
}
private class EdgeRelationshipPropertyProcessor {
public String update(Vertex in, Vertex out, Object edgeId, String label, Map<String, Object> props) {
edgeTraitTypesPropertyProcessor.update(label, in);
if(addRelationshipTypeForClassification(in, out, label, props)) {
label = Constants.CLASSIFICATION_LABEL;
} else {
......@@ -266,12 +306,12 @@ public class ElementProcessors {
}
private boolean addRelationshipTypeForClassification(Vertex in, Vertex out, String label, Map<String, Object> props) {
if (in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
String inTypeName = (String) in.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
if (in.property(ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
String inTypeName = (String) in.property(ENTITY_TYPE_PROPERTY_KEY).value();
if (StringUtils.isNotEmpty(inTypeName)) {
if (inTypeName.equals(label)) {
props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, inTypeName);
props.put(ENTITY_TYPE_PROPERTY_KEY, inTypeName);
addEntityGuidToTrait(in, out);
......@@ -302,7 +342,7 @@ public class ElementProcessors {
String typeName = getRelationshipTypeName(edgeLabel);
if (StringUtils.isNotEmpty(typeName)) {
props.put(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName);
props.put(ENTITY_TYPE_PROPERTY_KEY, typeName);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Could not find relationship type for: {}", edgeLabel);
......@@ -343,11 +383,11 @@ public class ElementProcessors {
}
private String[] getNonPrimitiveArrayFromLabel(Vertex v, String edgeId, String label) {
if (!v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
if (!v.property(ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
return null;
}
String typeName = (String) v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
String typeName = (String) v.property(ENTITY_TYPE_PROPERTY_KEY).value();
String propertyName = StringUtils.remove(label, Constants.INTERNAL_PROPERTY_KEY_PREFIX);
if(!containsNonPrimitiveCollectionProperty(typeName, propertyName, NON_PRIMITIVE_ARRAY_CATEGORY)) {
......@@ -368,11 +408,11 @@ public class ElementProcessors {
// this method extracts:
// key: what remains of the legacy label string when '__' and type name are removed
private String[] getNonPrimitiveMapKeyFromLabel(Vertex v, String label) {
if (!v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
if (!v.property(ENTITY_TYPE_PROPERTY_KEY).isPresent()) {
return null;
}
String typeName = (String) v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value();
String typeName = (String) v.property(ENTITY_TYPE_PROPERTY_KEY).value();
if(!postProcessMap.containsKey(typeName)) {
return null;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.InputStream;
import static org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase.getBulkLoadingGraphInstance;
import static org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase.getGraphInstance;
@Component
public class GraphDBGraphSONMigrator implements GraphDBMigrator {
private static final Logger LOG = LoggerFactory.getLogger(GraphDBMigrator.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("GraphDBMigrator");
private final TypesDefScrubber typesDefStrubberForMigrationImport = new TypesDefScrubber();
@Override
public AtlasTypesDef getScrubbedTypesDef(String jsonStr) {
AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class);
return typesDefStrubberForMigrationImport.scrub(typesDef);
}
@Override
public void importData(AtlasTypeRegistry typeRegistry, InputStream fs) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
LOG.info("Starting loadLegacyGraphSON...");
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "loadLegacyGraphSON");
}
AtlasGraphSONReader legacyGraphSONReader = AtlasGraphSONReader.build().
relationshipCache(new ElementProcessors(typeRegistry, typesDefStrubberForMigrationImport)).
schemaDB(getGraphInstance()).
bulkLoadingDB(getBulkLoadingGraphInstance()).
create();
legacyGraphSONReader.readGraph(fs);
} catch (Exception ex) {
LOG.error("Error loading loadLegacyGraphSON2", ex);
throw new AtlasBaseException(ex);
} finally {
AtlasPerfTracer.log(perf);
LOG.info("Done! loadLegacyGraphSON.");
}
}
@Override
public MigrationStatus getMigrationStatus() {
return ReaderStatusManager.get(getGraphInstance());
}
}
......@@ -21,12 +21,15 @@ package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasBigDecimalType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasBigIntegerType;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgeFeatures;
import org.apache.tinkerpop.gremlin.structure.Graph.Features.VertexFeatures;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -63,7 +66,17 @@ class GraphSONUtility {
for (Map.Entry<String, Object> entry : props.entrySet()) {
try {
vertex.property(vertexFeatures.getCardinality(entry.getKey()), entry.getKey(), entry.getValue());
final Cardinality cardinality = vertexFeatures.getCardinality(entry.getKey());
final String key = entry.getKey();
final Object val = entry.getValue();
if ((cardinality == Cardinality.list || cardinality == Cardinality.set) && (val instanceof Collection)) {
for (Object elem : (Collection) val) {
vertex.property(key, elem);
}
} else {
vertex.property(key, val);
}
} catch (IllegalArgumentException ex) {
schemaUpdate = getSchemaUpdateMap(schemaUpdate);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_KEY_VAL_SEP;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_PREFIX;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUFFIX;
public class TypesDefScrubber {
private static final Logger LOG = LoggerFactory.getLogger(TypesDefScrubber.class);
public static final String LEGACY_TYPE_NAME_PREFIX = "legacy";
private final Map<String, ClassificationToStructDefName> edgeLabelToClassificationToStructDefMap = new HashMap<>();
private final Map<String, Integer> classificationIndexMap = new HashMap<>();
private AtlasTypesDef typesDef;
public TypesDefScrubber() {
}
public AtlasTypesDef scrub(AtlasTypesDef typesDef) {
this.typesDef = typesDef;
display("incoming: ", typesDef);
createClassificationNameIndexMap(typesDef.getClassificationDefs());
for (AtlasStructDef structDef : new ArrayList<>(typesDef.getStructDefs())) { // work on copy of typesDef.getStructDefs(), as the list is modified by checkAndUpdate()
checkAndUpdate(structDef);
}
for (AtlasEntityDef entityDef : typesDef.getEntityDefs()) {
checkAndUpdate(entityDef);
}
display("scrubbed: ", typesDef);
return typesDef;
}
public Map<String, ClassificationToStructDefName> getTraitToTypeMap() {
return edgeLabelToClassificationToStructDefMap;
}
public static String getEdgeLabel(String typeName, String attributeName) {
return String.format("%s%s.%s", Constants.INTERNAL_PROPERTY_KEY_PREFIX, typeName, attributeName);
}
public static String getLegacyTypeNameForStructDef(String name) {
return String.format("%s_%s", LEGACY_TYPE_NAME_PREFIX, name);
}
private void display(String s, AtlasTypesDef typesDef) {
if(LOG.isDebugEnabled()) {
LOG.debug(s + "{}", typesDef.toString());
}
}
private void checkAndUpdate(AtlasStructDef structDef) {
for (AtlasAttributeDef attrDef : structDef.getAttributeDefs()) {
String attrTypeName = getAttributeTypeName(attrDef.getTypeName());
if (classificationIndexMap.containsKey(attrTypeName)) {
ClassificationToStructDefName pair = createLegacyStructDefFromClassification(attrTypeName);
if (pair != null) {
updateAttributeWithNewType(pair.getTypeName(), pair.getLegacyTypeName(), attrDef);
addStructDefToTypesDef(structDef.getName(), attrDef.getName(), pair);
LOG.info("scrubbed: {}:{} -> {}", structDef.getName(), attrDef.getName(), attrDef.getTypeName());
}
}
}
}
private String getAttributeTypeName(String typeName) {
if (AtlasTypeUtil.isArrayType(typeName)) {
int startIdx = ATLAS_TYPE_ARRAY_PREFIX.length();
int endIdx = typeName.length() - ATLAS_TYPE_ARRAY_SUFFIX.length();
String elementTypeName = typeName.substring(startIdx, endIdx).trim();
return elementTypeName;
} else if (AtlasTypeUtil.isMapType(typeName)) {
int startIdx = ATLAS_TYPE_MAP_PREFIX.length();
int endIdx = typeName.length() - ATLAS_TYPE_MAP_SUFFIX.length();
String[] keyValueTypes = typeName.substring(startIdx, endIdx).split(ATLAS_TYPE_MAP_KEY_VAL_SEP, 2);
String valueTypeName = keyValueTypes.length > 1 ? keyValueTypes[1].trim() : null;
return valueTypeName;
}
return typeName;
}
private void updateAttributeWithNewType(String oldTypeName, String newTypeName, AtlasAttributeDef ad) {
if(StringUtils.isEmpty(newTypeName)) {
return;
}
String str = ad.getTypeName().replace(oldTypeName, newTypeName);
ad.setTypeName(str);
}
private ClassificationToStructDefName createLegacyStructDefFromClassification(String typeName) {
AtlasClassificationDef classificationDef = getClassificationDefByName(typeName);
if (classificationDef == null) {
return null;
}
AtlasStructDef structDef = getStructDefFromClassificationDef(classificationDef);
addStructDefToTypesDef(structDef);
return new ClassificationToStructDefName(classificationDef.getName(), structDef.getName());
}
private void addStructDefToTypesDef(AtlasStructDef structDef) {
for (AtlasStructDef sDef : typesDef.getStructDefs()) {
if (StringUtils.equals(sDef.getName(), structDef.getName())) {
return;
}
}
typesDef.getStructDefs().add(structDef);
}
private void addStructDefToTypesDef(String typeName, String attributeName, ClassificationToStructDefName pair) {
String key = getEdgeLabel(typeName, attributeName);
edgeLabelToClassificationToStructDefMap.put(key, pair);
}
private AtlasClassificationDef getClassificationDefByName(String name) {
if (classificationIndexMap.containsKey(name)) {
return typesDef.getClassificationDefs().get(classificationIndexMap.get(name));
}
return null;
}
private AtlasStructDef getStructDefFromClassificationDef(AtlasClassificationDef classificationDef) {
String legacyTypeName = getLegacyTypeNameForStructDef(classificationDef.getName());
return new AtlasStructDef(legacyTypeName, classificationDef.getDescription(), classificationDef.getTypeVersion(),
getDefaultAttributeDefsIfNecessary(classificationDef.getAttributeDefs()));
}
private List<AtlasAttributeDef> getDefaultAttributeDefsIfNecessary(List<AtlasAttributeDef> attributeDefs) {
return attributeDefs.isEmpty() ? Collections.singletonList(new AtlasAttributeDef("name", "string")) : attributeDefs;
}
private void createClassificationNameIndexMap(List<AtlasClassificationDef> classificationDefs) {
for (int i = 0; i < classificationDefs.size(); i++) {
AtlasClassificationDef classificationDef = classificationDefs.get(i);
classificationIndexMap.put(classificationDef.getName(), i);
}
}
public static class ClassificationToStructDefName extends TypesUtil.Pair<String, String> {
public ClassificationToStructDefName(String typeName, String legacyTypeName) {
super(typeName, legacyTypeName);
}
public String getTypeName() {
return left;
}
public String getLegacyTypeName() {
return right;
}
}
}
......@@ -43,7 +43,7 @@ public class BaseUtils {
private static final String resourcesDirRelativePath = "/src/test/resources/";
private String resourceDir;
protected final ElementProcessors emptyRelationshipCache = new ElementProcessors(new HashMap<>(), new HashMap<>());
protected final ElementProcessors emptyRelationshipCache = new ElementProcessors(new HashMap<>(), new HashMap<>(), new HashMap<>());
protected GraphSONUtility graphSONUtility;
protected JsonNode getJsonNodeFromFile(String s) {
......
......@@ -28,7 +28,6 @@ import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
......@@ -44,7 +43,12 @@ import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAG
import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class GraphSONUtilityTest extends BaseUtils {
......@@ -81,7 +85,7 @@ public class GraphSONUtilityTest extends BaseUtils {
}
@Test
public void dataNodeReadAndVertexAddedToGraph() throws IOException {
public void dataNodeReadAndVertexAddedToGraph() {
JsonNode entityNode = getCol1();
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache);
......@@ -95,7 +99,7 @@ public class GraphSONUtilityTest extends BaseUtils {
}
@Test
public void typeNodeReadAndVertexNotAddedToGraph() throws IOException {
public void typeNodeReadAndVertexNotAddedToGraph() {
JsonNode entityNode = getDbType();
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(emptyRelationshipCache);
......@@ -105,8 +109,8 @@ public class GraphSONUtilityTest extends BaseUtils {
}
@Test
public void updateNonPrimitiveArrayProperty() throws IOException {
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getNonPrimitiveArray());
public void updateNonPrimitiveArrayProperty() {
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getNonPrimitiveArray(), new HashMap<>());
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(elementProcessors);
......@@ -118,12 +122,19 @@ public class GraphSONUtilityTest extends BaseUtils {
Map<String, String> list = (Map<String, String>) v.property(HIVE_TABLE_COLUMNS_RELATIONSHIP).value();
assertEquals(list.size(), 2);
List superTypeNames = (List) v.property("__superTypeNames").value();
assertNotNull(superTypeNames);
assertEquals(superTypeNames.size(), 3);
assertEquals(superTypeNames.get(0), "Asset");
assertEquals(superTypeNames.get(1), "DataSet");
assertEquals(superTypeNames.get(2), "Referenceable");
}
@Test
public void updatePrimitiveMapProperty() {
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMapPrimitive());
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMapPrimitive(), new HashMap<>());
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(elementProcessors);
......@@ -154,11 +165,10 @@ public class GraphSONUtilityTest extends BaseUtils {
}
@Test
public void edgeReadAndArrayIndexAdded() throws IOException {
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMap());
public void edgeReadAndArrayIndexAdded() {
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMap(), new HashMap<>());
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(elementProcessors);
Map<String, Object> m = null;
addVertexToGraph(tg, gu, getDBV(), getTableV(), getCol1(), getCol2());
addEdgeToGraph(tg, gu, new MappedElementCache(), getEdgeCol(), getEdgeCol2());
......@@ -185,13 +195,13 @@ public class GraphSONUtilityTest extends BaseUtils {
}
@Test
public void nonPrimitiveMap_Removed() throws IOException {
public void nonPrimitiveMap_Removed() {
Set<String> actualKeys = new HashSet<String>() {{
add("col3");
add("col4");
}};
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMap());
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMap(), new HashMap<>());
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(elementProcessors);
......@@ -218,9 +228,9 @@ public class GraphSONUtilityTest extends BaseUtils {
}
@Test
public void tagAssociated_NewAttributesAdded() throws IOException {
public void tagAssociated_NewAttributesAdded() {
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMap());
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMap(), new HashMap<>());
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(elementProcessors);
......@@ -247,8 +257,8 @@ public class GraphSONUtilityTest extends BaseUtils {
}
@Test
public void processEdge_PropagateSetTo_NONE() throws IOException {
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMap());
public void processEdge_PropagateSetTo_NONE() {
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMap(), new HashMap<>());
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(elementProcessors);
......@@ -275,12 +285,12 @@ public class GraphSONUtilityTest extends BaseUtils {
}
@Test
public void processEdge_PropagateSetTo_ONE_TO_TWO() throws IOException {
public void processEdge_PropagateSetTo_ONE_TO_TWO() {
Map<String, RelationshipCacheGenerator.TypeInfo> typeCache = new HashMap<String, RelationshipCacheGenerator.TypeInfo>() {{
put("__Process.inputs", new RelationshipCacheGenerator.TypeInfo("dataset_process_inputs", AtlasRelationshipDef.PropagateTags.TWO_TO_ONE));
}};
ElementProcessors elementProcessors = new ElementProcessors(typeCache, getPostProcessMap());
ElementProcessors elementProcessors = new ElementProcessors(typeCache, getPostProcessMap(), new HashMap<>());
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(elementProcessors);
......@@ -297,6 +307,41 @@ public class GraphSONUtilityTest extends BaseUtils {
}
}
@Test
public void entitiesWithTypesAsTraits() {
final String expectedLegacyTypeName = "traitprayivofx4";
final String expectedModifiedLegacyTypeName = "legacy_" + expectedLegacyTypeName;
Map<String, TypesDefScrubber.ClassificationToStructDefName> typesAsTraits = new HashMap<String, TypesDefScrubber.ClassificationToStructDefName>() {{
put("__createComplexTraitTypeTestprayivofx4.complexTrait",
new TypesDefScrubber.ClassificationToStructDefName("traitprayivofx4",
"legacy_traitprayivofx4"));
}};
ElementProcessors elementProcessors = new ElementProcessors(new HashMap<>(), getPostProcessMap(), typesAsTraits);
TinkerGraph tg = TinkerGraph.open();
GraphSONUtility gu = new GraphSONUtility(elementProcessors);
JsonNode nd = getJsonNodeFromFile("entity-with-trait-type.json");
addVertexToGraph(tg, gu, nd.get("vertices").get(0), nd.get("vertices").get(1));
addEdgeToGraph(tg, gu, new MappedElementCache(), nd.get("edges").get(0));
boolean asserted = false;
Iterator<Vertex> vertices = tg.vertices();
while(vertices.hasNext()) {
Vertex v = vertices.next();
String typeName = v.property(Constants.ENTITY_TYPE_PROPERTY_KEY).value().toString();
if(typeName.contains("traitprayivofx4")) {
assertEquals(typeName, expectedModifiedLegacyTypeName);
asserted = true;
}
}
assertTrue(asserted, "Condition was not met");
}
private Map<String, Map<String, List<String>>> getPostProcessMap() {
Map<String, Map<String, List<String>>> map = new HashMap<>();
map.put(HIVE_TABLE_TYPE, new HashMap<>());
......
{
"mode": "EXTENDED",
"vertices": [
{
"__typeName": {
"type": "string",
"value": "createComplexTraitTypeTestprayivofx4"
},
"createComplexTraitTypeTestprayivofx4.description": {
"type": "string",
"value": "this is a entity that has attribute created using legacy trait"
},
"_id": 66644,
"_type": "vertex"
},
{
"__typeName": {
"type": "string",
"value": "traitprayivofx4"
},
"traitprayivofx4.name": {
"type": "string",
"value": "legacy"
},
"_id": 82232,
"_type": "vertex"
}
],
"edges": [
{
"_id": "5f3ep0-cv4-qln9-pkw",
"_type": "edge",
"_outV": 66644,
"_inV": 82232,
"_label": "__createComplexTraitTypeTestprayivofx4.complexTrait"
}
]
}
......@@ -21,7 +21,7 @@ package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,22 +33,21 @@ import javax.inject.Singleton;
@Singleton
public class MigrationProgressService {
private static final Logger LOG = LoggerFactory.getLogger(MigrationProgressService.class);
public static final String MIGRATION_QUERY_CACHE_TTL = "atlas.migration.query.cache.ttlInSecs";
public static final String MIGRATION_QUERY_CACHE_TTL = "atlas.migration.query.cache.ttlInSecs";
@VisibleForTesting
static long DEFAULT_CACHE_TTL_IN_SECS = 30 * 1000; // 30 secs
static long DEFAULT_CACHE_TTL_IN_SECS = 30 * 1000; // 30 secs
private final long cacheValidity;
private final AtlasGraph graph;
private final GraphDBMigrator migrator;
private MigrationStatus cachedStatus;
private long cacheExpirationTime = 0;
@Inject
public MigrationProgressService(Configuration configuration, AtlasGraph graph) {
this.graph = graph;
this.cacheValidity = (configuration != null) ?
configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) :
DEFAULT_CACHE_TTL_IN_SECS;
public MigrationProgressService(Configuration configuration, GraphDBMigrator migrator) {
this.migrator = migrator;
this.cacheValidity = (configuration != null) ? configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) : DEFAULT_CACHE_TTL_IN_SECS;
}
public MigrationStatus getStatus() {
......@@ -57,8 +56,9 @@ public class MigrationProgressService {
private MigrationStatus fetchStatus() {
long currentTime = System.currentTimeMillis();
if(resetCache(currentTime)) {
cachedStatus = graph.getMigrationStatus();
cachedStatus = migrator.getMigrationStatus();
}
return cachedStatus;
......@@ -66,6 +66,7 @@ public class MigrationProgressService {
private boolean resetCache(long currentTime) {
boolean ret = cachedStatus == null || currentTime > cacheExpirationTime;
if(ret) {
cacheExpirationTime = currentTime + cacheValidity;
}
......
......@@ -22,11 +22,10 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.repository.impexp.ImportTypeDefProcessor;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.atlas.AtlasException;
......@@ -56,11 +55,11 @@ public class DataMigrationService implements Service {
private final Thread thread;
@Inject
public DataMigrationService(AtlasGraph graph, AtlasTypeDefStore typeDefStore, Configuration configuration,
public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore typeDefStore, Configuration configuration,
GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer,
AtlasTypeRegistry typeRegistry) {
this.configuration = configuration;
this.thread = new Thread(new FileImporter(graph, typeDefStore, typeRegistry, storeInitializer, getFileName(), indexer));
this.thread = new Thread(new FileImporter(migrator, typeDefStore, typeRegistry, storeInitializer, getFileName(), indexer));
}
@Override
......@@ -83,17 +82,17 @@ public class DataMigrationService implements Service {
}
public static class FileImporter implements Runnable {
private final AtlasGraph graph;
private final GraphDBMigrator migrator;
private final AtlasTypeDefStore typeDefStore;
private final String importDirectory;
private final GraphBackedSearchIndexer indexer;
private final AtlasTypeRegistry typeRegistry;
private final AtlasTypeDefStoreInitializer storeInitializer;
public FileImporter(AtlasGraph graph, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
public FileImporter(GraphDBMigrator migrator, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
AtlasTypeDefStoreInitializer storeInitializer,
String directoryName, GraphBackedSearchIndexer indexer) {
this.graph = graph;
this.migrator = migrator;
this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry;
this.storeInitializer = storeInitializer;
......@@ -120,7 +119,7 @@ public class DataMigrationService implements Service {
FileInputStream fs = new FileInputStream(getFileFromImportDirectory(importDirectory, ATLAS_MIGRATION_DATA_NAME));
graph.importLegacyGraphSON(typeRegistry, fs);
migrator.importData(typeRegistry, fs);
} catch (Exception ex) {
LOG.error("Import failed!", ex);
throw new AtlasBaseException(ex);
......@@ -158,7 +157,7 @@ public class DataMigrationService implements Service {
try {
AtlasImportResult result = new AtlasImportResult();
String jsonStr = FileUtils.readFileToString(typesDefFile);
AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class);
AtlasTypesDef typesDef = migrator.getScrubbedTypesDef(jsonStr);
ImportTypeDefProcessor processor = new ImportTypeDefProcessor(typeDefStore, typeRegistry);
processor.processTypes(typesDef, result);
......
......@@ -38,6 +38,8 @@ import org.apache.atlas.repository.audit.EntityAuditListenerV2;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.repository.graphdb.janus.migration.GraphDBGraphSONMigrator;
import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.ogm.profiles.AtlasSavedSearchDTO;
import org.apache.atlas.repository.ogm.profiles.AtlasUserProfileDTO;
......@@ -150,6 +152,7 @@ public class TestModules {
bind(AtlasLineageService.class).to(EntityLineageService.class).asEagerSingleton();
bind(BulkImporter.class).to(BulkImporterImpl.class).asEagerSingleton();
bind(GraphDBMigrator.class).to(GraphDBGraphSONMigrator.class).asEagerSingleton();
//Add EntityAuditListener as EntityChangeListener
Multibinder<EntityChangeListener> entityChangeListenerBinder =
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
......@@ -32,8 +33,8 @@ import java.io.IOException;
public class ComplexAttributesTest extends MigrationBaseAsserts {
@Inject
public ComplexAttributesTest(AtlasGraph graph) {
super(graph);
public ComplexAttributesTest(AtlasGraph graph, GraphDBMigrator migrator) {
super(graph, migrator);
}
@Test
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
......@@ -33,8 +34,8 @@ import java.io.IOException;
public class HiveParititionTest extends MigrationBaseAsserts {
@Inject
public HiveParititionTest(AtlasGraph graph) {
super(graph);
public HiveParititionTest(AtlasGraph graph, GraphDBMigrator migrator) {
super(graph, migrator);
}
@Test
......
......@@ -22,6 +22,7 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
......@@ -31,8 +32,8 @@ import java.io.IOException;
public class HiveStocksTest extends MigrationBaseAsserts {
@Inject
public HiveStocksTest(AtlasGraph graph) {
super(graph);
public HiveStocksTest(AtlasGraph graph, GraphDBMigrator migrator) {
super(graph, migrator);
}
@Test
......
......@@ -42,9 +42,12 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class MigrationBaseAsserts {
private static final String TYPE_NAME_PROPERTY = "__typeName";
private static final String R_GUID_PROPERTY_NAME = "_r__guid";
protected static final String ASSERT_NAME_PROPERTY = "Asset.name";
private final String TYPE_NAME_PROPERTY = "__typeName";
private final String R_GUID_PROPERTY_NAME = "_r__guid";
private final GraphDBMigrator migrator;
private final AtlasGraph graph;
@Inject
protected AtlasTypeDefStore typeDefStore;
......@@ -58,10 +61,9 @@ public class MigrationBaseAsserts {
@Inject
private GraphBackedSearchIndexer indexer;
protected AtlasGraph graph;
protected MigrationBaseAsserts(AtlasGraph graph) {
this.graph = graph;
protected MigrationBaseAsserts(AtlasGraph graph, GraphDBMigrator migrator) {
this.graph = graph;
this.migrator = migrator;
}
@AfterClass
......@@ -82,7 +84,7 @@ public class MigrationBaseAsserts {
protected void runFileImporter(String directoryToImport) throws IOException, AtlasBaseException {
loadTypesFromJson();
String directoryName = TestResourceFileUtils.getDirectory(directoryToImport);
DataMigrationService.FileImporter fi = new DataMigrationService.FileImporter(graph, typeDefStore, typeRegistry,
DataMigrationService.FileImporter fi = new DataMigrationService.FileImporter(migrator, typeDefStore, typeRegistry,
storeInitializer, directoryName, indexer);
fi.run();
......
......@@ -39,17 +39,10 @@ public class MigrationProgressServiceTest {
private final long increment = 1001l;
private final String statusSuccess = ReaderStatusManager.STATUS_SUCCESS;
private static class AtlasTinkerGraph {
public static AtlasGraph create(TinkerGraph tg) {
AtlasGraph g = mock(AtlasGraph.class);
when(g.getMigrationStatus()).thenAnswer(invocation -> ReaderStatusManager.get(tg));
return g;
}
public static AtlasGraph create() {
return create(TinkerGraph.open());
}
private GraphDBMigrator createMigrator(TinkerGraph tg) {
GraphDBMigrator gdm = mock(GraphDBMigrator.class);
when(gdm.getMigrationStatus()).thenAnswer(invocation -> ReaderStatusManager.get(tg));
return gdm;
}
@Test
......@@ -91,7 +84,7 @@ public class MigrationProgressServiceTest {
}
private MigrationProgressService getMigrationStatusForTest(Configuration cfg, TinkerGraph tg) {
return new MigrationProgressService(cfg, AtlasTinkerGraph.create(tg));
return new MigrationProgressService(cfg, createMigrator(tg));
}
@Test
......
......@@ -23,6 +23,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.type.AtlasBuiltInTypes;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
......@@ -38,8 +39,8 @@ import static org.testng.Assert.assertNotNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class PathTest extends MigrationBaseAsserts {
@Inject
public PathTest(AtlasGraph graph) {
super(graph);
public PathTest(AtlasGraph graph, GraphDBMigrator migrator) {
super(graph, migrator);
}
@Test
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graphdb.janus.migration.TypesDefScrubber;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.io.FileUtils;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Map;
import static org.apache.atlas.repository.graphdb.janus.migration.TypesDefScrubber.LEGACY_TYPE_NAME_PREFIX;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class TypesDefScrubberTest {
private static final String resourcesDirRelativePath = "/src/test/resources/";
private final String LEGACY_TYPESDEF_JSON = "legacy-typesdef.json";
private String resourceDir;
@BeforeClass
public void setup() {
resourceDir = System.getProperty("user.dir") + resourcesDirRelativePath;
}
protected AtlasTypesDef getTypesDefFromFile(String s) {
File f = new File(getFilePath(s));
try {
return AtlasType.fromJson(FileUtils.readFileToString(f), AtlasTypesDef.class);
} catch (IOException e) {
throw new SkipException("getTypesDefFromFile: " + s, e);
}
}
protected String getFilePath(String fileName) {
return Paths.get(resourceDir, fileName).toString();
}
@Test
public void performScrub() {
TypesDefScrubber typesDefScrubber = new TypesDefScrubber();
AtlasTypesDef td = getTypesDefFromFile(LEGACY_TYPESDEF_JSON);
int traitPrayIndex = 1;
int vendorPIIIndex = 2;
int financeIndex = 3;
int classificationTraitPrayIndex = 0;
int classificationVendorPiiIndex = 2;
int classificationFinancendex = 3;
String expectedTraitPrayStructName = TypesDefScrubber.getLegacyTypeNameForStructDef(td.getClassificationDefs().get(classificationTraitPrayIndex).getName());
String expectedVendorPIIStructName = TypesDefScrubber.getLegacyTypeNameForStructDef(td.getClassificationDefs().get(classificationVendorPiiIndex).getName());
String expectedFinanceStructName = TypesDefScrubber.getLegacyTypeNameForStructDef(td.getClassificationDefs().get(classificationFinancendex).getName());
assertNewTypesDef(typesDefScrubber.scrub(td), traitPrayIndex, vendorPIIIndex, financeIndex, expectedTraitPrayStructName, expectedVendorPIIStructName, expectedFinanceStructName);
assertTraitMap(typesDefScrubber, td, classificationTraitPrayIndex, expectedTraitPrayStructName, 0);
assertTraitMap(typesDefScrubber, td, classificationVendorPiiIndex, expectedVendorPIIStructName, 1);
assertTraitMap(typesDefScrubber, td, classificationFinancendex, expectedFinanceStructName, 2);
}
private void assertTraitMap(TypesDefScrubber typesDefScrubber, AtlasTypesDef td, int classificationIndex, String expectedStructName, int attrIndex) {
String label = typesDefScrubber.getEdgeLabel(td.getEntityDefs().get(0).getName(), td.getEntityDefs().get(0).getAttributeDefs().get(attrIndex).getName());
assertTrue(typesDefScrubber.getTraitToTypeMap().containsKey(label));
assertEquals(typesDefScrubber.getTraitToTypeMap().get(label).getTypeName(), td.getClassificationDefs().get(classificationIndex).getName());
assertEquals(typesDefScrubber.getTraitToTypeMap().get(label).getLegacyTypeName(), expectedStructName);
}
private void assertTraitMap(Map<String,TypesDefScrubber.ClassificationToStructDefName> traitToTypeMap, AtlasTypesDef td) {
}
private void assertNewTypesDef(AtlasTypesDef newTypes, int traitPrayIndex, int vendorPIIIndex, int financeIndex, String expectedTraitPrayStructName, String expectedVendorPIIStructName, String expectedFinanceStructName) {
assertNotNull(newTypes);
assertEquals(newTypes.getStructDefs().size(), 4);
assertTrue(newTypes.getStructDefs().get(traitPrayIndex).getName().contains(LEGACY_TYPE_NAME_PREFIX));
assertTrue(newTypes.getStructDefs().get(vendorPIIIndex).getName().contains(LEGACY_TYPE_NAME_PREFIX));
assertTrue(newTypes.getStructDefs().get(financeIndex).getName().contains(LEGACY_TYPE_NAME_PREFIX));
assertEquals(newTypes.getStructDefs().get(traitPrayIndex).getName(), expectedTraitPrayStructName);
assertEquals(newTypes.getStructDefs().get(vendorPIIIndex).getName(), expectedVendorPIIStructName);
assertEquals(newTypes.getStructDefs().get(financeIndex).getName(), expectedFinanceStructName);
assertEquals(newTypes.getStructDefs().get(1).getAttributeDefs().size(), 1);
assertEquals(newTypes.getEntityDefs().get(0).getAttributeDefs().get(0).getTypeName(), expectedTraitPrayStructName);
assertEquals(newTypes.getEntityDefs().get(0).getAttributeDefs().get(1).getTypeName(), String.format("array<%s>", expectedVendorPIIStructName));
assertEquals(newTypes.getEntityDefs().get(0).getAttributeDefs().get(2).getTypeName(), String.format("map<String,%s>", expectedFinanceStructName));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
@Guice(modules = TestModules.TestOnlyModule.class)
public class TypesWithClassificationTest extends MigrationBaseAsserts {
@Inject
public TypesWithClassificationTest(AtlasGraph graph, GraphDBMigrator migrator) {
super(graph, migrator);
}
@Test
public void verify() throws IOException, AtlasBaseException {
int EXPECTED_TOTAL_COUNT = 60;
String ENTITY_TYPE = "ComplexTraitType";
String LEGACY_TYPE_TRAIT = "legacy_traitprayivofx4";
String LEGACY_TYPE_VENDOR_PII = "legacy_VENDOR_PII";
String LEGACY_TYPE_FINANCE = "legacy_FINANCE";
runFileImporter("classification_defs");
assertTypeCountNameGuid(ENTITY_TYPE, 1,"", "");
assertTypeCountNameGuid(LEGACY_TYPE_TRAIT, 1, "", "");
assertTypeCountNameGuid(LEGACY_TYPE_VENDOR_PII, 3, "", "");
assertTypeCountNameGuid(LEGACY_TYPE_FINANCE, 2, "", "");
assertEdgesWithLabel(getVertex(ENTITY_TYPE, "").getEdges(AtlasEdgeDirection.OUT).iterator(),1, "__ComplexTraitType.vendors");
assertEdgesWithLabel(getVertex(ENTITY_TYPE, "").getEdges(AtlasEdgeDirection.OUT).iterator(),4, "__ComplexTraitType.finance");
assertEdgesWithLabel(getVertex(ENTITY_TYPE, "").getEdges(AtlasEdgeDirection.OUT).iterator(),6, "__ComplexTraitType.complexTrait");
assertMigrationStatus(EXPECTED_TOTAL_COUNT);
}
}
......@@ -22,6 +22,7 @@ import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.repository.graphdb.janus.migration.TypesWithCollectionsFinder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
......@@ -37,8 +38,8 @@ import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class TypesWithCollectionsFinderTest extends MigrationBaseAsserts {
@Inject
protected TypesWithCollectionsFinderTest(AtlasGraph graph) {
super(graph);
protected TypesWithCollectionsFinderTest(AtlasGraph graph, GraphDBMigrator migrator) {
super(graph, migrator);
}
@BeforeClass
......
{
"mode": "EXTENDED",
"vertices": [
{
"__typeName": {
"type": "string",
"value": "ComplexTraitType"
},
"ComplexTraitType.description": {
"type": "string",
"value": "this is a entity that has attribute created using legacy trait"
},
"ComplexTraitType.vendors": {
"type": "list",
"value": [
{
"type": "string",
"value": "5f3ep0-cv4-aaaa-pkw"
},
{
"type": "string",
"value": "5f3ep0-cv4-bbbb-pkw"
},
{
"type": "string",
"value": "5f3ep0-cv4-cccc-pkw"
}
]
},
"ComplexTraitType.finance": {
"type": "list",
"value": [
{
"type": "string",
"value": "key1"
},
{
"type": "string",
"value": "key2"
}
]
},
"_id": 66644,
"_type": "vertex"
},
{
"__typeName": {
"type": "string",
"value": "traitprayivofx4"
},
"traitprayivofx4.name": {
"type": "string",
"value": "legacy"
},
"_id": 82232,
"_type": "vertex"
},
{
"__typeName": {
"type": "string",
"value": "VENDOR_PII"
},
"VENDOR_PII.name": {
"type": "string",
"value": "legacy"
},
"_id": 90000,
"_type": "vertex"
},
{
"__typeName": {
"type": "string",
"value": "VENDOR_PII"
},
"VENDOR_PII.name": {
"type": "string",
"value": "legacy"
},
"_id": 90001,
"_type": "vertex"
},
{
"__typeName": {
"type": "string",
"value": "VENDOR_PII"
},
"VENDOR_PII.name": {
"type": "string",
"value": "legacy"
},
"_id": 90002,
"_type": "vertex"
},
{
"__typeName": {
"type": "string",
"value": "FINANCE"
},
"FINANCE.name": {
"type": "string",
"value": "legacy"
},
"_id": 80001,
"_type": "vertex"
},
{
"__typeName": {
"type": "string",
"value": "FINANCE"
},
"FINANCE.name": {
"type": "string",
"value": "legacy"
},
"_id": 80002,
"_type": "vertex"
}
],
"edges": [
{
"_id": "5f3ep0-cv4-qln9-pkw",
"_type": "edge",
"_outV": 66644,
"_inV": 82232,
"_label": "__ComplexTraitType.complexTrait"
},
{
"_id": "5f3ep0-cv4-aaaa-pkw",
"_type": "edge",
"_outV": 66644,
"_inV": 90000,
"_label": "__ComplexTraitType.vendors"
},
{
"_id": "5f3ep0-cv4-bbbb-pkw",
"_type": "edge",
"_outV": 66644,
"_inV": 90001,
"_label": "__ComplexTraitType.vendors"
},
{
"_id": "5f3ep0-cv4-cccc-pkw",
"_type": "edge",
"_outV": 66644,
"_inV": 90002,
"_label": "__ComplexTraitType.vendors"
},
{
"_id": "5f3ep0-cv4-wwww-pkw",
"_type": "edge",
"_outV": 66644,
"_inV": 80001,
"_label": "__ComplexTraitType.finance.key1"
},
{
"_id": "5f3ep0-cv4-xxxx-pkw",
"_type": "edge",
"_outV": 66644,
"_inV": 80002,
"_label": "__ComplexTraitType.finance.key2"
}
]
}
{
"enumDefs": [
],
"structDefs": [
{
"category": "STRUCT",
"guid": "42d9a0d1-8aba-406a-9437-9c2155d8fe02",
"createdBy": "root",
"updatedBy": "root",
"createTime": 1522693757121,
"updateTime": 1522693757121,
"version": 1,
"name": "order",
"description": "order",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "order",
"typeName": "int",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
}
]
}
],
"classificationDefs": [
{
"category": "CLASSIFICATION",
"guid": "c21bfb98-fdbd-4ae6-9dd8-0818079da4be",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902192324,
"updateTime": 1526902192324,
"version": 1,
"name": "traitprayivofx4",
"description": "traitprayivofx4",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"subTypes": []
},
{
"category": "CLASSIFICATION",
"guid": "c21bfb98-fdbd-4ae6-9dd8-0818079da444",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902192324,
"updateTime": 1526902192324,
"version": 1,
"name": "PII",
"description": "PII",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"subTypes": []
},
{
"category": "CLASSIFICATION",
"guid": "c21bfb98-fdbd-4ae6-9dd8-0818079da555",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902192324,
"updateTime": 1526902192324,
"version": 1,
"name": "VENDOR_PII",
"description": "PII",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"subTypes": []
},
{
"category": "CLASSIFICATION",
"guid": "c21bfb98-fdbd-4ae6-9dd8-0818079da666",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902192324,
"updateTime": 1526902192324,
"version": 1,
"name": "FINANCE",
"description": "FINANCE",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"subTypes": []
}
],
"entityDefs": [
{
"category": "ENTITY",
"guid": "2f9b17b6-8a7d-495f-9778-8676912cb621",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902193007,
"updateTime": 1526902193007,
"version": 1,
"name": "ComplexTraitType",
"description": "ComplexTraitType",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "complexTrait",
"typeName": "traitprayivofx4",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
},
{
"name": "vendors",
"typeName": "array<VENDOR_PII>",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
},
{
"name": "finance",
"typeName": "map<string,FINANCE>",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
},
{
"name": "description",
"typeName": "string",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
}
],
"superTypes": [],
"subTypes": []
}
]
}
{
"enumDefs": [
],
"structDefs": [
{
"category": "STRUCT",
"guid": "42d9a0d1-8aba-406a-9437-9c2155d8fe02",
"createdBy": "root",
"updatedBy": "root",
"createTime": 1522693757121,
"updateTime": 1522693757121,
"version": 1,
"name": "order",
"description": "order",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "order",
"typeName": "int",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
}
]
}
],
"classificationDefs": [
{
"category": "CLASSIFICATION",
"guid": "c21bfb98-fdbd-4ae6-9dd8-0818079da4be",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902192324,
"updateTime": 1526902192324,
"version": 1,
"name": "traitprayivofx4",
"description": "traitprayivofx4",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"subTypes": []
},
{
"category": "CLASSIFICATION",
"guid": "c21bfb98-fdbd-4ae6-9dd8-0818079da444",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902192324,
"updateTime": 1526902192324,
"version": 1,
"name": "PII",
"description": "PII",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"subTypes": []
},
{
"category": "CLASSIFICATION",
"guid": "c21bfb98-fdbd-4ae6-9dd8-0818079da555",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902192324,
"updateTime": 1526902192324,
"version": 1,
"name": "VENDOR_PII",
"description": "VENDOR_PII",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"subTypes": []
},
{
"category": "CLASSIFICATION",
"guid": "c21bfb98-fdbd-4ae6-9dd8-0818079da666",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902192324,
"updateTime": 1526902192324,
"version": 1,
"name": "FINANCE",
"description": "FINANCE",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"subTypes": []
}
],
"entityDefs": [
{
"category": "ENTITY",
"guid": "2f9b17b6-8a7d-495f-9778-8676912cb621",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1526902193007,
"updateTime": 1526902193007,
"version": 1,
"name": "createComplexTraitTypeTestprayivofx4",
"description": "createComplexTraitTypeTestprayivofx4",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "complexTrait",
"typeName": "traitprayivofx4",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
},
{
"name": "vendors",
"typeName": "array<VENDOR_PII>",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
},
{
"name": "finance_assets",
"typeName": "map<String,FINANCE>",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
},
{
"name": "description",
"typeName": "string",
"isOptional": false,
"cardinality": "SINGLE",
"valuesMinCount": 1,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false
}
],
"superTypes": [],
"subTypes": []
}
]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment