Commit 7660c9b2 by Shwetha GS

ATLAS-128 DSL - Add support for comparisions on list type (suma.shivaprasad via shwethags)

parent 539f2431
......@@ -329,12 +329,16 @@ public class HiveHookIT {
String tableType = HiveDataTypes.HIVE_TABLE.getName();
LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value);
//todo replace with DSL
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', ['%s']).as('p')."
/* gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', ['%s']).as('p')."
+ "out('__%s.table').has('%s.tableName', '%s').out('__%s.db').has('%s.name', '%s')"
+ ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, value, typeName,
tableType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, CLUSTER_NAME);
assertEntityIsRegistered(gremlinQuery);
*/
String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', "
+ "db where name = '%s' and clusterName = '%s' select p", typeName, value,
tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
assertEntityIsRegistered(dslQuery);
}
private String assertEntityIsRegistered(final String query) throws Exception {
......
......@@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (suma.shivaprasad via shwethags)
ALL CHANGES:
ATLAS-128 DSL - Add support for comparisions on list type (suma.shivaprasad via shwethags)
ATLAS-168 Atlas UI - Max column in hive 4 (darshankumar89 via shwethags)
ATLAS-155 Images do not show up on the dashboard (darshankumar89 via shwethags)
ATLAS-134 Some defects found when reviewing the source code (ltfxyz via shwethags)
......
......@@ -18,6 +18,8 @@
package org.apache.atlas.discovery.graph;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.thinkaurelius.titan.core.TitanVertex;
import org.apache.atlas.AtlasException;
import org.apache.atlas.query.Expressions;
......@@ -32,6 +34,7 @@ import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructType;
......@@ -40,6 +43,7 @@ import org.apache.atlas.typesystem.types.TypeSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
......@@ -106,11 +110,18 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi
case PRIMITIVE:
case ENUM:
return dataType.convert(value, Multiplicity.OPTIONAL);
case ARRAY:
// todo
break;
DataTypes.ArrayType arrType = (DataTypes.ArrayType) dataType;
IDataType<?> elemType = arrType.getElemType();
ImmutableCollection.Builder result = ImmutableList.builder();
List list = (List) value;
for(Object listElement : list) {
Object collectionEntry = constructCollectionEntry(elemType, listElement);
if(collectionEntry != null) {
result.add(collectionEntry);
}
}
return (U)result.build();
case MAP:
// todo
break;
......@@ -161,6 +172,25 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi
return null;
}
public <U> U constructCollectionEntry(IDataType<U> elementType, Object value) throws AtlasException {
switch (elementType.getTypeCategory()) {
case PRIMITIVE:
case ENUM:
return constructInstance(elementType, value);
//The array values in case of STRUCT, CLASS contain the edgeId if the outgoing edge which links to the STRUCT, CLASS vertex referenced
case STRUCT:
case CLASS:
String edgeId = (String) value;
return (U) metadataRepository.getGraphToInstanceMapper().getReferredEntity(edgeId, elementType);
case ARRAY:
case MAP:
case TRAIT:
return null;
default:
throw new UnsupportedOperationException("Load for type " + elementType + " in collections is not supported");
}
}
@Override
public String edgeLabel(TypeUtils.FieldInfo fInfo) {
return fInfo.reverseDataType() == null ? edgeLabel(fInfo.dataType(), fInfo.attrInfo()) :
......
......@@ -152,7 +152,7 @@ public interface MetadataRepository {
void deleteTrait(String guid, String traitNameToBeDeleted) throws RepositoryException;
/**
* Adds the property to the entity that corresponds to the GUID
* Adds/Updates the property to/in the entity that corresponds to the GUID
* @param guid entity id
* @param property property name
* @param value property value
......
......@@ -77,7 +77,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
private static final Logger LOG = LoggerFactory.getLogger(GraphBackedMetadataRepository.class);
private static final String FULL_TEXT_DELIMITER = " ";
private static final String EDGE_LABEL_PREFIX = "__";
private final TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper();
......@@ -117,7 +116,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
public String getTraitLabel(IDataType<?> dataType, String traitName) {
return dataType.getName() + "." + traitName;
return getTraitLabel(dataType.getName(), traitName);
}
@Override
......@@ -137,6 +136,10 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
return EDGE_LABEL_PREFIX + typeName + "." + attrName;
}
public String getTraitLabel(String typeName, String attrName) {
return typeName + "." + attrName;
}
public String getEdgeLabel(ITypedInstance typedInstance, AttributeInfo aInfo) throws AtlasException {
IDataType dataType = typeSystem.getDataType(IDataType.class, typedInstance.getTypeName());
return getEdgeLabel(dataType, aInfo);
......@@ -263,8 +266,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
// add the trait instance as a new vertex
final String typeName = getTypeName(instanceVertex);
instanceToGraphMapper
.mapTraitInstanceToVertex(traitInstance, getIdFromVertex(typeName, instanceVertex), typeName,
.mapTraitInstanceToVertex(traitInstance, getIdFromVertex(typeName, instanceVertex), typeSystem.getDataType(ClassType.class, typeName),
instanceVertex, Collections.<Id, Vertex>emptyMap());
// update the traits in entity once adding trait instance is successful
......@@ -298,7 +302,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
final String entityTypeName = getTypeName(instanceVertex);
String relationshipLabel = getEdgeLabel(entityTypeName, traitNameToBeDeleted);
String relationshipLabel = getTraitLabel(entityTypeName, traitNameToBeDeleted);
Iterator<Edge> results = instanceVertex.getEdges(Direction.OUT, relationshipLabel).iterator();
if (results.hasNext()) { // there should only be one edge for this label
final Edge traitEdge = results.next();
......@@ -384,7 +388,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
return getQualifiedName(dataType, attributeInfo.name);
}
String getQualifiedName(IDataType dataType, String attributeName) throws AtlasException {
public static String getQualifiedName(IDataType dataType, String attributeName) throws AtlasException {
return dataType.getTypeCategory() == DataTypes.TypeCategory.STRUCT ? dataType.getName() + "." + attributeName
// else class or trait
: ((HierarchicalType) dataType).getQualifiedName(attributeName);
......@@ -683,7 +687,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
ITypedStruct traitInstance = (ITypedStruct) typedInstance.getTrait(traitName);
// add the attributes for the trait instance
mapTraitInstanceToVertex(traitInstance, typedInstance, instanceVertex,
mapTraitInstanceToVertex(traitInstance, typedInstance.getId(), classType, instanceVertex,
entityProcessor.idToVertexMap);
}
}
......@@ -870,15 +874,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
return structInstanceVertex;
}
private void mapTraitInstanceToVertex(ITypedStruct traitInstance, ITypedReferenceableInstance typedInstance,
Vertex parentInstanceVertex, Map<Id, Vertex> idToVertexMap) throws AtlasException {
// add a new vertex for the struct or trait instance
mapTraitInstanceToVertex(traitInstance, typedInstance.getId(), typedInstance.getTypeName(),
parentInstanceVertex, idToVertexMap);
}
private void mapTraitInstanceToVertex(ITypedStruct traitInstance, Id typedInstanceId,
String typedInstanceTypeName, Vertex parentInstanceVertex, Map<Id, Vertex> idToVertexMap)
IDataType entityType, Vertex parentInstanceVertex, Map<Id, Vertex> idToVertexMap)
throws AtlasException {
// add a new vertex for the struct or trait instance
final String traitName = traitInstance.getTypeName();
......@@ -892,7 +889,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
traitInstance.fieldMapping().fields, idToVertexMap, false);
// add an edge to the newly created vertex from the parent
String relationshipLabel = getEdgeLabel(typedInstanceTypeName, traitName);
String relationshipLabel = getTraitLabel(entityType, traitName);
GraphHelper.addEdge(titanGraph, parentInstanceVertex, traitInstanceVertex, relationshipLabel);
}
......@@ -1015,7 +1012,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
case CLASS:
String relationshipLabel = getEdgeLabel(typedInstance, attributeInfo);
Object idOrInstance = mapClassReferenceToVertex(instanceVertex, attributeInfo, relationshipLabel,
Object idOrInstance = mapVertexToClassReference(instanceVertex, attributeInfo, relationshipLabel,
attributeInfo.dataType());
typedInstance.set(attributeInfo.name, idOrInstance);
break;
......@@ -1025,7 +1022,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
}
private Object mapClassReferenceToVertex(Vertex instanceVertex, AttributeInfo attributeInfo,
private Object mapVertexToClassReference(Vertex instanceVertex, AttributeInfo attributeInfo,
String relationshipLabel, IDataType dataType) throws AtlasException {
LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
Iterator<Edge> results = instanceVertex.getEdges(Direction.OUT, relationshipLabel).iterator();
......@@ -1090,7 +1087,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
(String) value);
case CLASS:
return mapClassReferenceToVertex(instanceVertex, attributeInfo, edgeLabel, elementType, (String) value);
return mapVertexToClassReference(instanceVertex, attributeInfo, edgeLabel, elementType, (String) value);
default:
break;
......@@ -1134,7 +1131,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
elemType.getName());
StructType structType = typeSystem.getDataType(StructType.class, elemType.getName());
ITypedStruct structInstance = structType.createInstance();
mapVertexToInstance(structInstanceVertex, structInstance, structType.fieldMapping().fields);
return structInstance;
}
......@@ -1146,7 +1142,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
return null;
}
private Object mapClassReferenceToVertex(Vertex instanceVertex, AttributeInfo attributeInfo,
private Object mapVertexToClassReference(Vertex instanceVertex, AttributeInfo attributeInfo,
String relationshipLabel, IDataType dataType, String edgeId) throws AtlasException {
LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
for (Edge edge : instanceVertex.getEdges(Direction.OUT, relationshipLabel)) {
......@@ -1205,7 +1201,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
private void mapVertexToTraitInstance(Vertex instanceVertex, String typedInstanceTypeName, String traitName,
TraitType traitType, ITypedStruct traitInstance) throws AtlasException {
String relationshipLabel = getEdgeLabel(typedInstanceTypeName, traitName);
String relationshipLabel = getTraitLabel(typedInstanceTypeName, traitName);
LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel);
for (Edge edge : instanceVertex.getEdges(Direction.OUT, relationshipLabel)) {
final Vertex traitInstanceVertex = edge.getVertex(Direction.IN);
......@@ -1252,5 +1248,34 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
typedInstance.setDate(attributeInfo.name, new Date(dateVal));
}
}
public ITypedInstance getReferredEntity(String edgeId, IDataType<?> referredType) throws AtlasException {
final Edge edge = titanGraph.getEdge(edgeId);
if(edge != null) {
final Vertex referredVertex = edge.getVertex(Direction.IN);
if (referredVertex != null) {
switch (referredType.getTypeCategory()) {
case STRUCT:
LOG.debug("Found struct instance vertex {}, mapping to instance {} ", referredVertex,
referredType.getName());
StructType structType = (StructType)referredType;
ITypedStruct instance = structType.createInstance();
Map<String, AttributeInfo> fields = structType.fieldMapping().fields;
mapVertexToInstance(referredVertex, instance, fields);
return instance;
case CLASS:
//TODO isComposite handling for class loads
final String guid = referredVertex.getProperty(Constants.GUID_PROPERTY_KEY);
Id referenceId =
new Id(guid, referredVertex.<Integer>getProperty(Constants.VERSION_PROPERTY_KEY),
referredType.getName());
return referenceId;
default:
throw new UnsupportedOperationException("Loading " + referredType.getTypeCategory() + " is not supported");
}
}
}
return null;
}
}
}
......@@ -48,9 +48,9 @@ public class TitanGraphProvider implements GraphProvider<TitanGraph> {
*/
private static final String GRAPH_PREFIX = "atlas.graph";
private static TitanGraph graphInstance;
private static volatile TitanGraph graphInstance;
private static Configuration getConfiguration() throws AtlasException {
public static Configuration getConfiguration() throws AtlasException {
Configuration configProperties = ApplicationProperties.get();
return ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX);
}
......@@ -84,10 +84,7 @@ public class TitanGraphProvider implements GraphProvider<TitanGraph> {
}
}
@Override
@Singleton
@Provides
public TitanGraph get() {
public static TitanGraph getGraphInstance() {
if(graphInstance == null) {
synchronized (TitanGraphProvider.class) {
if(graphInstance == null) {
......@@ -104,4 +101,11 @@ public class TitanGraphProvider implements GraphProvider<TitanGraph> {
}
return graphInstance;
}
@Override
@Singleton
@Provides
public TitanGraph get() {
return getGraphInstance();
}
}
......@@ -18,7 +18,11 @@
package org.apache.atlas.query
import java.util
import com.google.common.collect.ImmutableCollection
import org.apache.atlas.AtlasException
import org.apache.atlas.typesystem.ITypedInstance
import org.apache.atlas.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory}
import org.apache.atlas.typesystem.types._
......@@ -468,6 +472,22 @@ object Expressions {
}
}
import scala.collection.JavaConversions._
case class ListLiteral[_](dataType: ArrayType, rawValue: List[Expressions.Literal[_]]) extends Expression with LeafNode {
val lc : java.util.List[Expressions.Literal[_]] = rawValue
val value = if (rawValue != null) dataType.convert(lc, Multiplicity.REQUIRED)
override def toString = value match {
case l: Seq[_]
=> l.mkString("[",",","]")
case c: ImmutableCollection[_] =>
c.asList.mkString("[",",","]")
case x =>
x.toString
}
}
def literal[T](typ: PrimitiveType[T], rawValue: Any) = new Literal[T](typ, rawValue)
def boolean(rawValue: Any) = literal(DataTypes.BOOLEAN_TYPE, rawValue)
......@@ -492,6 +512,12 @@ object Expressions {
def date(rawValue: Any) = literal(DataTypes.DATE_TYPE, rawValue)
def list[_ <: PrimitiveType[_]](listElements: List[Expressions.Literal[_]]) = {
listLiteral(TypeSystem.getInstance().defineArrayType(listElements.head.dataType), listElements)
}
def listLiteral[_ <: PrimitiveType[_]](typ: ArrayType, rawValue: List[Expressions.Literal[_]]) = new ListLiteral(typ, rawValue)
case class ArithmeticExpression(symbol: String,
left: Expression,
right: Expression)
......@@ -601,7 +627,9 @@ object Expressions {
s"datatype. Can not resolve due to unresolved children")
}
if(left.dataType == DataTypes.DATE_TYPE) {
if(left.dataType.getName.startsWith(DataTypes.ARRAY_TYPE_PREFIX)) {
left.dataType;
} else if(left.dataType == DataTypes.DATE_TYPE) {
DataTypes.DATE_TYPE
} else if (left.dataType != DataTypes.STRING_TYPE || right.dataType != DataTypes.STRING_TYPE) {
TypeUtils.combinedType(left.dataType, right.dataType)
......@@ -651,7 +679,6 @@ object Expressions {
val GEN_COL_ALIAS_PREFIX = "_col"
case class SelectExpression(child: Expression, selectList: List[Expression]) extends Expression {
val children = List(child) ::: selectList
lazy val selectListWithAlias = selectList.zipWithIndex map {
case (s: AliasExpression, _) => s
......
......@@ -18,18 +18,23 @@
package org.apache.atlas.query
import java.util
import java.util.Date
import com.thinkaurelius.titan.core.TitanVertex
import com.tinkerpop.blueprints.Direction
import com.tinkerpop.blueprints.{Vertex, Direction}
import org.apache.atlas.AtlasException
import org.apache.atlas.query.Expressions.{ComparisonExpression, ExpressionException}
import org.apache.atlas.query.TypeUtils.FieldInfo
import org.apache.atlas.repository.graph.GraphBackedMetadataRepository
import org.apache.atlas.typesystem.persistence.Id
import org.apache.atlas.typesystem.types.DataTypes._
import org.apache.atlas.typesystem.types._
import org.apache.atlas.typesystem.{ITypedInstance, ITypedReferenceableInstance}
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* Represents the Bridge between the QueryProcessor and the Graph Persistence scheme used.
......@@ -186,13 +191,15 @@ object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
val superTypeAttributeName = "superTypeNames"
val idAttributeName = "guid"
def edgeLabel(dataType: IDataType[_], aInfo: AttributeInfo) = s"${dataType.getName}.${aInfo.name}"
def edgeLabel(dataType: IDataType[_], aInfo: AttributeInfo) = s"__${dataType.getName}.${aInfo.name}"
def edgeLabel(propertyName: String) = s"__${propertyName}"
val fieldPrefixInSelect = "it"
def traitLabel(cls: IDataType[_], traitName: String) = s"${cls.getName}.$traitName"
def fieldNameInVertex(dataType: IDataType[_], aInfo: AttributeInfo) = aInfo.name
def fieldNameInVertex(dataType: IDataType[_], aInfo: AttributeInfo) = GraphBackedMetadataRepository.getQualifiedName(dataType, aInfo.name)
def getIdFromVertex(dataTypeNm: String, v: TitanVertex): Id =
new Id(v.getId.toString, 0, dataTypeNm)
......@@ -209,6 +216,8 @@ object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
def constructInstance[U](dataType: IDataType[U], v: AnyRef): U = {
dataType.getTypeCategory match {
case DataTypes.TypeCategory.PRIMITIVE => dataType.convert(v, Multiplicity.OPTIONAL)
case DataTypes.TypeCategory.ARRAY =>
dataType.convert(v, Multiplicity.OPTIONAL)
case DataTypes.TypeCategory.STRUCT
if dataType.getName == TypeSystem.getInstance().getIdType.getName => {
val sType = dataType.asInstanceOf[StructType]
......@@ -278,7 +287,7 @@ object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
case DataTypes.TypeCategory.PRIMITIVE => loadPrimitiveAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.ENUM => loadEnumAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.ARRAY =>
throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported")
loadArrayAttribute(dataType, aInfo, i, v)
case DataTypes.TypeCategory.MAP =>
throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported")
case DataTypes.TypeCategory.STRUCT => loadStructAttribute(dataType, aInfo, i, v)
......@@ -314,9 +323,26 @@ object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
}
}
private def loadStructAttribute(dataType: IDataType[_], aInfo: AttributeInfo,
private def loadArrayAttribute[T](dataType: IDataType[_], aInfo: AttributeInfo,
i: ITypedInstance, v: TitanVertex): Unit = {
val eLabel = edgeLabel(FieldInfo(dataType, aInfo, null))
import scala.collection.JavaConversions._
val list: java.util.List[_] = v.getProperty(aInfo.name)
val arrayType: DataTypes.ArrayType = aInfo.dataType.asInstanceOf[ArrayType]
var values = new util.ArrayList[Any]
list.foreach( listElement =>
values += mapVertexToCollectionEntry(v, aInfo, arrayType.getElemType, i, listElement)
)
i.set(aInfo.name, values)
}
private def loadStructAttribute(dataType: IDataType[_], aInfo: AttributeInfo,
i: ITypedInstance, v: TitanVertex, edgeLbl: Option[String] = None): Unit = {
val eLabel = edgeLbl match {
case Some(x) => x
case None => edgeLabel(FieldInfo(dataType, aInfo, null))
}
val edges = v.getEdges(Direction.OUT, eLabel)
val sVertex = edges.iterator().next().getVertex(Direction.IN).asInstanceOf[TitanVertex]
if (aInfo.dataType().getTypeCategory == DataTypes.TypeCategory.STRUCT) {
......@@ -329,5 +355,22 @@ object GraphPersistenceStrategy1 extends GraphPersistenceStrategies {
i.set(aInfo.name, cInstance)
}
}
private def mapVertexToCollectionEntry(instanceVertex: TitanVertex, attributeInfo: AttributeInfo, elementType: IDataType[_], i: ITypedInstance, value: Any): Any = {
elementType.getTypeCategory match {
case DataTypes.TypeCategory.PRIMITIVE => value
case DataTypes.TypeCategory.ENUM => value
case DataTypes.TypeCategory.STRUCT =>
throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported")
case DataTypes.TypeCategory.TRAIT =>
throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported")
case DataTypes.TypeCategory.CLASS => //loadStructAttribute(elementType, attributeInfo, i, v)
throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported")
case _ =>
throw new UnsupportedOperationException(s"load for ${attributeInfo.dataType()} not supported")
}
}
}
......@@ -77,6 +77,7 @@ trait SelectExpressionHandling {
val l = ArrayBuffer[String]()
e.traverseUp {
case BackReference(alias, _, _) => l += alias
case ClassExpression(clsName) => l += clsName
}
l.toSet.toList
}
......@@ -140,14 +141,19 @@ class GremlinTranslator(expr: Expression,
}
val validateComparisonForm: PartialFunction[Expression, Unit] = {
case c@ComparisonExpression(_, left, right) =>
case c@ComparisonExpression(op, left, right) =>
if (!left.isInstanceOf[FieldExpression]) {
throw new GremlinTranslationException(c, s"lhs of comparison is not a field")
}
if (!right.isInstanceOf[Literal[_]]) {
if (!right.isInstanceOf[Literal[_]] && !right.isInstanceOf[ListLiteral[_]]) {
throw new GremlinTranslationException(c,
s"rhs of comparison is not a literal")
}
if(right.isInstanceOf[ListLiteral[_]] && (!op.equals("=") && !op.equals("!="))) {
throw new GremlinTranslationException(c,
s"operation not supported with list literal")
}
()
}
......@@ -201,7 +207,8 @@ class GremlinTranslator(expr: Expression,
typeTestExpression(clsName)
case TraitExpression(clsName) =>
typeTestExpression(clsName)
case fe@FieldExpression(fieldName, fInfo, child) if fe.dataType.getTypeCategory == TypeCategory.PRIMITIVE => {
case fe@FieldExpression(fieldName, fInfo, child)
if fe.dataType.getTypeCategory == TypeCategory.PRIMITIVE || fe.dataType.getTypeCategory == TypeCategory.ARRAY => {
val fN = "\"" + gPersistenceBehavior.fieldNameInVertex(fInfo.dataType, fInfo.attrInfo) + "\""
child match {
case Some(e) => s"${genQuery(e, inSelect)}.$fN"
......@@ -218,8 +225,7 @@ class GremlinTranslator(expr: Expression,
case None => step
}
}
case fe@FieldExpression(fieldName, fInfo, child)
if fInfo.traitName != null => {
case fe@FieldExpression(fieldName, fInfo, child) if fInfo.traitName != null => {
val direction = gPersistenceBehavior.instanceToTraitEdgeDirection
val edgeLbl = gPersistenceBehavior.edgeLabel(fInfo)
val step = s"""$direction("$edgeLbl")"""
......@@ -294,12 +300,20 @@ class GremlinTranslator(expr: Expression,
s"""out("${gPersistenceBehavior.traitLabel(clsExp.dataType, traitName)}")"""
case isTraitUnaryExpression(traitName, child) =>
s"""out("${gPersistenceBehavior.traitLabel(child.dataType, traitName)}")"""
case hasFieldLeafExpression(fieldName, Some(clsExp)) =>
s"""has("$fieldName")"""
case hasFieldLeafExpression(fieldName, clsExp) => clsExp match {
case None => s"""has("$fieldName")"""
case Some(x) =>
x match {
case c: ClassExpression =>
s"""has("${x.asInstanceOf[ClassExpression].clsName}.$fieldName")"""
case default => s"""has("$fieldName")"""
}
}
case hasFieldUnaryExpression(fieldName, child) =>
s"""${genQuery(child, inSelect)}.has("$fieldName")"""
case ArithmeticExpression(symb, left, right) => s"${genQuery(left, inSelect)} $symb ${genQuery(right, inSelect)}"
case l: Literal[_] => l.toString
case list: ListLiteral[_] => list.toString
case in@TraitInstanceExpression(child) => {
val direction = gPersistenceBehavior.traitToInstanceEdgeDirection
s"${genQuery(child, inSelect)}.$direction()"
......@@ -356,6 +370,7 @@ class GremlinTranslator(expr: Expression,
}
/*
* TODO
* Translation Issues:
* 1. back references in filters. For e.g. testBackreference: 'DB as db Table where (db.name = "Reporting")'
* this is translated to:
......
......@@ -34,6 +34,8 @@ trait QueryKeywords {
protected implicit def asParser(k: Keyword): Parser[String] = k.str
protected val LIST_LPAREN = Keyword("[")
protected val LIST_RPAREN = Keyword("]")
protected val LPAREN = Keyword("(")
protected val RPAREN = Keyword(")")
protected val EQ = Keyword("=")
......@@ -222,8 +224,11 @@ class QueryParser extends StandardTokenParsers with QueryKeywords with Expressio
def multiERight = (STAR | DIV) ~ atomE ^^ { case op ~ r => (op, r)}
def atomE = literal | identifier | LPAREN ~> expr <~ RPAREN | listLiteral
def atomE = literal | identifier | LPAREN ~> expr <~ RPAREN
def listLiteral = LIST_LPAREN ~ rep1sep(literal, COMMA) ~ LIST_RPAREN ^^ {
case lp ~ le ~ rp => list(le)
}
def identifier = rep1sep(ident, DOT) ^^ { l => l match {
case h :: Nil => id(h)
......
......@@ -112,7 +112,7 @@ object FieldValidator extends PartialFunction[Expression, Expression] {
case fe@FieldExpression(fNm, fInfo, Some(child)) if isSrc(child) =>
throw new ExpressionException(fe, s"srcType of field doesn't match input type")
case hasFieldUnaryExpression(fNm, child) if child.dataType == srcDataType =>
hasFieldLeafExpression(fNm)
hasFieldLeafExpression(fNm, Some(child))
case hF@hasFieldUnaryExpression(fNm, child) if isSrc(child) =>
throw new ExpressionException(hF, s"srcType of field doesn't match input type")
case isTraitUnaryExpression(fNm, child) if child.dataType == srcDataType =>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.EnumTypeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* Base Class to set up hive types and instances for tests
*/
@Guice(modules = RepositoryMetadataModule.class)
public class BaseHiveRepositoryTest {
@Inject
protected DefaultMetadataService metadataService;
@Inject
protected GraphBackedMetadataRepository repository;
@Inject
protected GraphProvider<TitanGraph> graphProvider;
protected void setUp() throws Exception {
setUpTypes();
new GraphBackedSearchIndexer(graphProvider);
setupInstances();
// TestUtils.dumpGraph(graphProvider.get());
}
protected void tearDown() throws Exception {
TypeSystem.getInstance().reset();
try {
graphProvider.get().shutdown();
} catch (Exception e) {
e.printStackTrace();
}
try {
TitanCleanup.clear(graphProvider.get());
} catch (Exception e) {
e.printStackTrace();
}
}
private void setUpTypes() throws Exception {
TypesDef typesDef = createTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
metadataService.createType(typesAsJSON);
}
private static final String DATABASE_TYPE = "hive_db";
private static final String HIVE_TABLE_TYPE = "hive_table";
private static final String COLUMN_TYPE = "hive_column";
private static final String HIVE_PROCESS_TYPE = "hive_process";
private static final String STORAGE_DESC_TYPE = "StorageDesc";
private static final String VIEW_TYPE = "View";
private static final String PARTITION_TYPE = "hive_partition";
TypesDef createTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
.createClassTypeDef(DATABASE_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
.createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
.createClassTypeDef(STORAGE_DESC_TYPE, null,
new AttributeDefinition("cols", String.format("array<%s>", COLUMN_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("location", DataTypes.STRING_TYPE),
attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableList.of("DataSet"),
attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.DATE_TYPE),
attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
attrDef("temporary", DataTypes.BOOLEAN_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
// todo - uncomment this, something is broken
new AttributeDefinition("sd", STORAGE_DESC_TYPE,
Multiplicity.REQUIRED, true, null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null));
HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE),
attrDef("endTime", DataTypes.LONG_TYPE),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
.createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null));
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null),
};
HierarchicalTypeDefinition<ClassType> partClsDef =
new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null,
attributeDefinitions);
HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null);
HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null);
return TypeUtils.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef),
ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef));
}
AttributeDefinition attrDef(String name, IDataType dT) {
return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
return attrDef(name, dT, m, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
String reverseAttributeName) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(dT);
return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
}
private void setupInstances() throws Exception {
Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
Referenceable sd =
storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id")));
List<Referenceable> salesFactColumns = ImmutableList
.of(column("time_id", "int", "time id"), column("product_id", "int", "product id"),
column("customer_id", "int", "customer id", "PII"),
column("sales", "double", "product id", "Metric"));
Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
List<Referenceable> timeDimColumns = ImmutableList
.of(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"),
column("weekDay", "int", "week Day"));
Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
"Dimension");
Id reportingDB =
database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
Id salesFactDaily =
table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
salesFactColumns, "Metric");
loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
List<Referenceable> productDimColumns = ImmutableList
.of(column("product_id", "int", "product id"), column("product_name", "string", "product name"),
column("brand_name", "int", "brand name"));
Id productDim =
table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
"Dimension");
view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
List<Referenceable> customerDimColumns = ImmutableList.of(column("customer_id", "int", "customer id", "PII"),
column("name", "string", "customer name", "PII"),
column("address", "string", "customer address", "PII"));
Id customerDim =
table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
"Dimension");
view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
Id salesFactMonthly =
table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
"Managed", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily);
}
Id database(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE);
return createInstance(referenceable, clsType);
}
Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns)
throws Exception {
Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
referenceable.set("location", location);
referenceable.set("inputFormat", inputFormat);
referenceable.set("outputFormat", outputFormat);
referenceable.set("compressed", compressed);
referenceable.set("cols", columns);
return referenceable;
}
Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("dataType", dataType);
referenceable.set("comment", comment);
return referenceable;
}
Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
List<Referenceable> columns, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
referenceable.set("temporary", false);
referenceable.set("createTime", new Date(System.currentTimeMillis()));
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", dbId);
// todo - uncomment this, something is broken
referenceable.set("sd", sd);
referenceable.set("columns", columns);
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE);
return createInstance(referenceable, clsType);
}
Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
referenceable.set("queryId", queryId);
referenceable.set("queryGraph", queryGraph);
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE);
return createInstance(referenceable, clsType);
}
Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("db", dbId);
referenceable.set("inputTables", inputTables);
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE);
return createInstance(referenceable, clsType);
}
Id partition(List<String> values, Id table, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames);
referenceable.set("values", values);
referenceable.set("table", table);
ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE);
return createInstance(referenceable, clsType);
}
private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
// String entityJSON = InstanceSerialization.toJson(referenceable, true);
ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
String guid = repository.createEntities(typedInstance)[0];
// return the reference to created instance with guid
return new Id(guid, 0, referenceable.getTypeName());
}
}
......@@ -20,16 +20,11 @@ package org.apache.atlas.discovery;
import com.google.common.collect.ImmutableList;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.BaseHiveRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.TestUtils;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.query.HiveTitanSample;
import org.apache.atlas.query.QueryTestsUtils;
import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
......@@ -38,7 +33,6 @@ import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.commons.io.FileUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert;
......@@ -49,18 +43,13 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.File;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef;
import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef;
@Guice(modules = RepositoryMetadataModule.class)
public class GraphBackedDiscoveryServiceTest {
public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest {
@Inject
private GraphProvider<TitanGraph> graphProvider;
......@@ -73,12 +62,8 @@ public class GraphBackedDiscoveryServiceTest {
@BeforeClass
public void setUp() throws Exception {
super.setUp();
TypeSystem typeSystem = TypeSystem.getInstance();
typeSystem.reset();
QueryTestsUtils.setupTypes();
setupSampleData();
TestUtils.defineDeptEmployeeTypes(typeSystem);
Referenceable hrDept = TestUtils.createDeptEg1(typeSystem);
......@@ -88,42 +73,9 @@ public class GraphBackedDiscoveryServiceTest {
repositoryService.createEntities(hrDept2);
}
private void setupSampleData() throws ScriptException {
TitanGraph titanGraph = graphProvider.get();
ScriptEngineManager manager = new ScriptEngineManager();
ScriptEngine engine = manager.getEngineByName("gremlin-groovy");
Bindings bindings = engine.createBindings();
bindings.put("g", titanGraph);
String hiveGraphFile = FileUtils.getTempDirectory().getPath() + File.separator + System.nanoTime() + ".gson";
System.out.println("hiveGraphFile = " + hiveGraphFile);
HiveTitanSample.writeGson(hiveGraphFile);
bindings.put("hiveGraphFile", hiveGraphFile);
engine.eval("g.loadGraphSON(hiveGraphFile)", bindings);
titanGraph.commit();
System.out.println("*******************Graph Dump****************************");
for (Vertex vertex : titanGraph.getVertices()) {
System.out.println(GraphHelper.vertexString(vertex));
}
for (Edge edge : titanGraph.getEdges()) {
System.out.println(GraphHelper.edgeString(edge));
}
System.out.println("*******************Graph Dump****************************");
}
@AfterClass
public void tearDown() throws Exception {
TypeSystem.getInstance().reset();
graphProvider.get().shutdown();
try {
TitanCleanup.clear(graphProvider.get());
} catch(Exception e) {
e.printStackTrace();
}
super.tearDown();
}
@Test
......@@ -176,54 +128,81 @@ public class GraphBackedDiscoveryServiceTest {
@DataProvider(name = "dslQueriesProvider")
private Object[][] createDSLQueries() {
return new String[][]{
{"from DB"}, {"DB"}, {"DB where DB.name=\"Reporting\""}, {"DB DB.name = \"Reporting\""},
{"DB where DB.name=\"Reporting\" select name, owner"}, {"DB has name"}, {"DB, Table"},
{"DB is JdbcAccess"},
return new Object[][]{
{"from hive_db", 2},
{"hive_db", 2},
{"hive_db where hive_db.name=\"Reporting\"", 1},
{"hive_db hive_db.name = \"Reporting\"", 1},
{"hive_db where hive_db.name=\"Reporting\" select name, owner", 1},
{"hive_db has name", 2},
{"hive_db, hive_table", 6},
{"View is JdbcAccess", 2},
{"hive_db as db1, hive_table where db1.name = \"Reporting\"", 0}, //Not working - ATLAS-145
// - Final working query -> discoveryService.searchByGremlin("L:{_var_0 = [] as Set;g.V().has(\"__typeName\", \"hive_db\").fill(_var_0);g.V().has(\"__superTypeNames\", \"hive_db\").fill(_var_0);_var_0._().as(\"db1\").in(\"__hive_table.db\").back(\"db1\").and(_().has(\"hive_db.name\", T.eq, \"Reporting\")).toList()}")
/*
{"DB, LoadProcess has name"},
{"DB as db1, Table where db1.name = \"Reporting\""},
{"DB where DB.name=\"Reporting\" and DB.createTime < " + System.currentTimeMillis()},
{"hive_db, hive_process has name"}, //Invalid query
{"hive_db where hive_db.name=\"Reporting\" and hive_db.createTime < " + System.currentTimeMillis()}
*/
{"from Table"}, {"Table"}, {"Table is Dimension"}, {"Column where Column isa PII"},
{"View is Dimension"},
/*{"Column where Column isa PII select Column.name"},*/
{"Column select Column.name"}, {"Column select name"}, {"Column where Column.name=\"customer_id\""},
{"from Table select Table.name"}, {"DB where (name = \"Reporting\")"},
{"DB where (name = \"Reporting\") select name as _col_0, owner as _col_1"},
{"DB where DB is JdbcAccess"}, {"DB where DB has name"}, {"DB Table"}, {"DB where DB has name"},
{"DB as db1 Table where (db1.name = \"Reporting\")"},
{"DB where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 "},
{"Table where (name = \"sales_fact\" and created > \"2014-01-01\" ) select name as _col_0, created as _col_1 "},
{"Table where (name = \"sales_fact\" and created > \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, created as _col_1 "},
{"from hive_table", 6},
{"hive_table", 6},
{"hive_table isa Dimension", 3},
{"hive_column where hive_column isa PII", 6},
{"View is Dimension" , 2},
// {"hive_column where hive_column isa PII select hive_column.name", 6}, //Not working - ATLAS-175
{"hive_column select hive_column.name", 27},
{"hive_column select name", 27},
{"hive_column where hive_column.name=\"customer_id\"", 4},
{"from hive_table select hive_table.name", 6},
{"hive_db where (name = \"Reporting\")", 1},
{"hive_db where (name = \"Reporting\") select name as _col_0, owner as _col_1", 1},
{"hive_db where hive_db is JdbcAccess", 0}, //Not supposed to work
{"hive_db hive_table", 6},
{"hive_db where hive_db has name", 2},
{"hive_db as db1 hive_table where (db1.name = \"Reporting\")", 0}, //Not working -> ATLAS-145
{"hive_db where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 ", 1},
{"hive_table where (name = \"sales_fact\" and createTime > \"2014-01-01\" ) select name as _col_0, createTime as _col_1 ", 1},
{"hive_table where (name = \"sales_fact\" and createTime >= \"2014-12-11T02:35:58.440Z\" ) select name as _col_0, createTime as _col_1 ", 1},
/*
todo: does not work
{"DB where (name = \"Reporting\") and ((createTime + 1) > 0)"},
{"DB as db1 Table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") select db1.name
todo: does not work - ATLAS-146
{"hive_db where (name = \"Reporting\") and ((createTime + 1) > 0)"},
{"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") select db1.name
as dbName, tab.name as tabName"},
{"DB as db1 Table as tab where ((db1.createTime + 1) > 0) or (db1.name = \"Reporting\") select db1.name
{"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) or (db1.name = \"Reporting\") select db1.name
as dbName, tab.name as tabName"},
{"DB as db1 Table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") or db1 has owner
{"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") or db1 has owner
select db1.name as dbName, tab.name as tabName"},
{"DB as db1 Table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") or db1 has owner
{"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") or db1 has owner
select db1.name as dbName, tab.name as tabName"},
*/
// trait searches
{"Dimension"},
/*{"Fact"}, - todo: does not work*/
{"JdbcAccess"}, {"ETL"}, {"Metric"}, {"PII"},
{"Dimension", 5},
{"JdbcAccess", 2},
{"ETL", 2},
{"Metric", 5},
{"PII", 6},
/* Lineage queries are fired through ClosureQuery and are tested through HiveLineageJerseyResourceIt in webapp module.
Commenting out the below queries since DSL to Gremlin parsing/translation fails with lineage queries when there are array types
used within loop expressions which is the case with DataSet.inputs and outputs.`
// Lineage
{"Table LoadProcess outputTable"}, {"Table loop (LoadProcess outputTable)"},
{"Table as _loop0 loop (LoadProcess outputTable) withPath"},
{"Table as src loop (LoadProcess outputTable) as dest select src.name as srcTable, dest.name as "
+ "destTable withPath"},
{"Table as t, sd, Column as c where t.name=\"sales_fact\" select c.name as colName, c.dataType as "
+ "colType"},
{"Table where name='sales_fact', db where name='Reporting'"}};
*/
// {"hive_table as t, sd, hive_column as c where t.name=\"sales_fact\" select c.name as colName, c.dataType as "
// + "colType", 0}, //Not working - ATLAS-145 and ATLAS-166
{"hive_table where name='sales_fact', db where name='Sales'", 1},
{"hive_table where name='sales_fact', db where name='Reporting'", 0},
{"hive_partition as p where values = ['2015-01-01']", 1},
// {"StorageDesc select cols", 6} //Not working since loading of lists needs to be fixed yet
};
}
@Test(dataProvider = "dslQueriesProvider")
public void testSearchByDSLQueries(String dslQuery) throws Exception {
public void testSearchByDSLQueries(String dslQuery, Integer expectedNumRows) throws Exception {
System.out.println("Executing dslQuery = " + dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery);
Assert.assertNotNull(jsonResults);
......@@ -242,7 +221,7 @@ public class GraphBackedDiscoveryServiceTest {
JSONArray rows = results.getJSONArray("rows");
Assert.assertNotNull(rows);
Assert.assertTrue(rows.length() >= 0); // some queries may not have any results
Assert.assertEquals(rows.length(), expectedNumRows.intValue()); // some queries may not have any results
System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows");
}
......
......@@ -18,32 +18,10 @@
package org.apache.atlas.discovery;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import org.apache.atlas.BaseHiveRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.repository.EntityNotFoundException;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.services.DefaultMetadataService;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.EnumTypeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert;
......@@ -60,37 +38,22 @@ import java.util.List;
* Unit tests for Hive LineageService.
*/
@Guice(modules = RepositoryMetadataModule.class)
public class HiveLineageServiceTest {
public class HiveLineageServiceTest extends BaseHiveRepositoryTest {
@Inject
private GraphBackedDiscoveryService discoveryService;
@Inject
private DefaultMetadataService metadataService;
@Inject
private GraphProvider<TitanGraph> graphProvider;
@Inject
private HiveLineageService hiveLineageService;
@BeforeClass
public void setUp() throws Exception {
setUpTypes();
setupInstances();
// TestUtils.dumpGraph(graphProvider.get());
super.setUp();
}
@AfterClass
public void tearDown() throws Exception {
TypeSystem.getInstance().reset();
graphProvider.get().shutdown();
try {
TitanCleanup.clear(graphProvider.get());
} catch (Exception e) {
e.printStackTrace();
}
super.tearDown();
}
@DataProvider(name = "dslQueriesProvider")
......@@ -296,248 +259,4 @@ public class HiveLineageServiceTest {
hiveLineageService.getSchema("blah");
Assert.fail();
}
private void setUpTypes() throws Exception {
TypesDef typesDef = createTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
metadataService.createType(typesAsJSON);
}
private static final String DATABASE_TYPE = "hive_db";
private static final String HIVE_TABLE_TYPE = "hive_table";
private static final String COLUMN_TYPE = "hive_column";
private static final String HIVE_PROCESS_TYPE = "hive_process";
private static final String STORAGE_DESC_TYPE = "StorageDesc";
private static final String VIEW_TYPE = "View";
private TypesDef createTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
.createClassTypeDef(DATABASE_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
.createClassTypeDef(STORAGE_DESC_TYPE, null, attrDef("location", DataTypes.STRING_TYPE),
attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
.createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableList.of("DataSet"),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE),
attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
attrDef("temporary", DataTypes.BOOLEAN_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
// todo - uncomment this, something is broken
// new AttributeDefinition("sd", STORAGE_DESC_TYPE,
// Multiplicity.REQUIRED, true, null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null));
HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE),
attrDef("endTime", DataTypes.LONG_TYPE),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
.createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
Multiplicity.COLLECTION, false, null));
HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null);
HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null);
return TypeUtils.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef),
ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef));
}
AttributeDefinition attrDef(String name, IDataType dT) {
return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
return attrDef(name, dT, m, false, null);
}
AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
String reverseAttributeName) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(dT);
return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
}
private void setupInstances() throws Exception {
Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
Referenceable sd =
storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true);
List<Referenceable> salesFactColumns = ImmutableList
.of(column("time_id", "int", "time id"), column("product_id", "int", "product id"),
column("customer_id", "int", "customer id", "PII"),
column("sales", "double", "product id", "Metric"));
Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
List<Referenceable> timeDimColumns = ImmutableList
.of(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"),
column("weekDay", "int", "week Day"));
Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
"Dimension");
Id reportingDB =
database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
Id salesFactDaily =
table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
salesFactColumns, "Metric");
loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
List<Referenceable> productDimColumns = ImmutableList
.of(column("product_id", "int", "product id"), column("product_name", "string", "product name"),
column("brand_name", "int", "brand name"));
Id productDim =
table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
"Dimension");
view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
List<Referenceable> customerDimColumns = ImmutableList.of(column("customer_id", "int", "customer id", "PII"),
column("name", "string", "customer name", "PII"),
column("address", "string", "customer address", "PII"));
Id customerDim =
table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
"Dimension");
view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
Id salesFactMonthly =
table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
"Managed", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
}
Id database(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());
return createInstance(referenceable);
}
Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)
throws Exception {
Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
referenceable.set("location", location);
referenceable.set("inputFormat", inputFormat);
referenceable.set("outputFormat", outputFormat);
referenceable.set("compressed", compressed);
return referenceable;
}
Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("dataType", dataType);
referenceable.set("comment", comment);
return referenceable;
}
Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
List<Referenceable> columns, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
referenceable.set("temporary", false);
referenceable.set("createTime", System.currentTimeMillis());
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", dbId);
// todo - uncomment this, something is broken
// referenceable.set("sd", sd);
referenceable.set("columns", columns);
return createInstance(referenceable);
}
Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
referenceable.set("queryId", queryId);
referenceable.set("queryGraph", queryGraph);
return createInstance(referenceable);
}
Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("db", dbId);
referenceable.set("inputTables", inputTables);
return createInstance(referenceable);
}
private Id createInstance(Referenceable referenceable) throws Exception {
String typeName = referenceable.getTypeName();
System.out.println("creating instance of type " + typeName);
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
JSONArray jsonArray = new JSONArray();
jsonArray.put(entityJSON);
String response = metadataService.createEntities(jsonArray.toString());
String guid = new JSONArray(response).getString(0);
System.out.println("created instance for type " + typeName + ", guid: " + guid);
// return the reference to created instance with guid
return new Id(guid, 0, referenceable.getTypeName());
}
}
......@@ -101,12 +101,12 @@ public class GraphBackedMetadataRepositoryTest {
try {
//TODO - Fix failure during shutdown while using BDB
graphProvider.get().shutdown();
} catch(Exception e) {
} catch (Exception e) {
e.printStackTrace();
}
try {
TitanCleanup.clear(graphProvider.get());
} catch(Exception e) {
} catch (Exception e) {
e.printStackTrace();
}
}
......
......@@ -40,6 +40,7 @@ import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.commons.io.FileUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
......@@ -147,7 +148,7 @@ public class GraphRepoMapperScaleTest {
searchWithOutIndex("hive_table.name", "bar-999");
searchWithIndex("hive_table.name", "bar-999");
searchWithIndex("hive_table.created", Compare.GREATER_THAN_EQUAL, TestUtils.TEST_DATE_IN_LONG);
searchWithIndex("hive_table.created", Compare.GREATER_THAN_EQUAL, TestUtils.TEST_DATE_IN_LONG, 1000);
for (int index = 500; index < 600; index++) {
searchWithIndex("hive_table.name", "bar-" + index);
......@@ -185,7 +186,7 @@ public class GraphRepoMapperScaleTest {
}
}
private void searchWithIndex(String key, Predicate searchPredicate, Object value) {
private void searchWithIndex(String key, Predicate searchPredicate, Object value, int expectedResults) {
TitanGraph graph = graphProvider.get();
long start = System.currentTimeMillis();
int count = 0;
......@@ -197,6 +198,7 @@ public class GraphRepoMapperScaleTest {
} finally {
System.out.println("Search on [" + key + "=" + value + "] returned results: " + count + ", took " + (
System.currentTimeMillis() - start) + " ms");
Assert.assertEquals(count, expectedResults);
}
}
......
......@@ -19,8 +19,12 @@
package org.apache.atlas.query
import com.thinkaurelius.titan.core.TitanGraph
import com.thinkaurelius.titan.core.util.TitanCleanup
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy
import org.apache.atlas.query.Expressions._
import org.apache.atlas.repository.graph.{TitanGraphProvider, GraphBackedMetadataRepository}
import org.apache.atlas.typesystem.types.TypeSystem
import org.junit.Test
import org.junit.runner.RunWith
import org.scalatest.Matchers._
import org.scalatest._
......@@ -30,46 +34,332 @@ import org.scalatest.junit.JUnitRunner
class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
var g: TitanGraph = null
var gp: GraphPersistenceStrategies = null;
var gProvider: TitanGraphProvider = null;
override def beforeAll() {
TypeSystem.getInstance().reset()
QueryTestsUtils.setupTypes
g = QueryTestsUtils.setupTestGraph
gProvider = new TitanGraphProvider();
gp = new DefaultGraphPersistenceStrategy(new GraphBackedMetadataRepository(gProvider))
g = QueryTestsUtils.setupTestGraph(gProvider)
}
override def afterAll() {
g.shutdown()
try {
TitanCleanup.clear(g);
} catch {
case ex: Exception =>
print("Could not clear the graph ", ex);
}
}
test("testClass") {
val r = QueryProcessor.evaluate(_class("DB"), g)
validateJson(r, "{\n \"query\":\"DB\",\n \"dataType\":{\n \"superTypes\":[\n \n ],\n \"hierarchicalMetaTypeName\":\"org.apache.atlas.typesystem.types.ClassType\",\n \"typeName\":\"DB\",\n \"attributeDefinitions\":[\n {\n \"name\":\"name\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"owner\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"createTime\",\n \"dataTypeName\":\"int\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"DB\",\n \"$id$\":{\n \"id\":\"256\",\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"owner\":\"John ETL\",\n \"name\":\"Sales\",\n \"createTime\":1000\n },\n {\n \"$typeName$\":\"DB\",\n \"$id$\":{\n \"id\":\"7424\",\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"owner\":\"Jane BI\",\n \"name\":\"Reporting\",\n \"createTime\":1500\n }\n ]\n}")
val r = QueryProcessor.evaluate(_class("DB"), g, gp)
validateJson(r, """{
| "query": "DB",
| "dataType": {
| "superTypes": [
|
| ],
| "hierarchicalMetaTypeName": "org.apache.atlas.typesystem.types.ClassType",
| "typeName": "DB",
| "attributeDefinitions": [
| {
| "name": "name",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "owner",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "createTime",
| "dataTypeName": "int",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
|
| },
| {
| "name": "clusterName",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| }
| ]
| },
| "rows": [
| {
| "$typeName$": "DB",
| "$id$": {
| "$typeName$": "DB",
| "version": 0
| },
| "owner": "John ETL",
| "name": "Sales",
| "createTime": 1000,
| "clusterName": "test"
| },
| {
| "$typeName$": "DB",
| "$id$": {
| "$typeName$": "DB",
| "version": 0
| },
| "owner": "Jane BI",
| "name": "Reporting",
| "createTime": 1500,
| "clusterName": "test"
| }
| ]
| }""".stripMargin)
}
test("testName") {
val r = QueryProcessor.evaluate(_class("DB").field("name"), g)
val r = QueryProcessor.evaluate(_class("DB").field("name"), g, gp)
validateJson(r, "{\n \"query\":\"DB.name\",\n \"dataType\":\"string\",\n \"rows\":[\n \"Sales\",\n \"Reporting\"\n ]\n}")
}
test("testFilter") {
var r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))), g)
validateJson(r, "{\n \"query\":\"DB where (name = \\\"Reporting\\\")\",\n \"dataType\":{\n \"superTypes\":[\n \n ],\n \"hierarchicalMetaTypeName\":\"org.apache.atlas.typesystem.types.ClassType\",\n \"typeName\":\"DB\",\n \"attributeDefinitions\":[\n {\n \"name\":\"name\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"owner\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"createTime\",\n \"dataTypeName\":\"int\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"DB\",\n \"$id$\":{\n \"id\":\"7424\",\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"owner\":\"Jane BI\",\n \"name\":\"Reporting\",\n \"createTime\":1500\n }\n ]\n}")
var r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))), g, gp)
validateJson(r, """{
| "query": "DB where (name = \"Reporting\")",
| "dataType": {
| "superTypes": [],
| "hierarchicalMetaTypeName": "org.apache.atlas.typesystem.types.ClassType",
| "typeName": "DB",
| "attributeDefinitions": [
| {
| "name": "name",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "owner",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "createTime",
| "dataTypeName": "int",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "clusterName",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| }
| ]
| },
| "rows": [
| {
| "$typeName$": "DB",
| "$id$": {
| "$typeName$": "DB",
| "version": 0
| },
| "owner": "Jane BI",
| "name": "Reporting",
| "createTime": 1500,
| "clusterName": "test"
| }
| ]
|}""".stripMargin);
}
test("testFilter2") {
var r = QueryProcessor.evaluate(_class("DB").where(id("DB").field("name").`=`(string("Reporting"))), g)
validateJson(r, "{\n \"query\":\"DB where (name = \\\"Reporting\\\")\",\n \"dataType\":{\n \"superTypes\":[\n \n ],\n \"hierarchicalMetaTypeName\":\"org.apache.atlas.typesystem.types.ClassType\",\n \"typeName\":\"DB\",\n \"attributeDefinitions\":[\n {\n \"name\":\"name\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"owner\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"createTime\",\n \"dataTypeName\":\"int\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"DB\",\n \"$id$\":{\n \"id\":\"7424\",\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"owner\":\"Jane BI\",\n \"name\":\"Reporting\",\n \"createTime\":1500\n }\n ]\n}")
var r = QueryProcessor.evaluate(_class("DB").where(id("DB").field("name").`=`(string("Reporting"))), g, gp)
validateJson(r, """{
| "query": "DB where (name = \"Reporting\")",
| "dataType": {
| "superTypes": [],
| "hierarchicalMetaTypeName": "org.apache.atlas.typesystem.types.ClassType",
| "typeName": "DB",
| "attributeDefinitions": [
| {
| "name": "name",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "owner",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "createTime",
| "dataTypeName": "int",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "clusterName",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| }
| ]
| },
| "rows": [
| {
| "$typeName$": "DB",
| "$id$": {
| "$typeName$": "DB",
| "version": 0
| },
| "owner": "Jane BI",
| "name": "Reporting",
| "createTime": 1500,
| "clusterName": "test"
| }
| ]
|}""".stripMargin);
}
test("testSelect") {
val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))).
select(id("name"), id("owner")), g)
validateJson(r, "{\n \"query\":\"DB where (name = \\\"Reporting\\\") as _src1 select _src1.name as _col_0, _src1.owner as _col_1\",\n \"dataType\":{\n \"typeName\":\"__tempQueryResultStruct1\",\n \"attributeDefinitions\":[\n {\n \"name\":\"_col_0\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"_col_1\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"__tempQueryResultStruct1\",\n \"_col_1\":\"Jane BI\",\n \"_col_0\":\"Reporting\"\n }\n ]\n}")
select(id("name"), id("owner")), g, gp)
validateJson(r, """{
| "query": "DB where (name = \"Reporting\") as _src1 select _src1.name as _col_0, _src1.owner as _col_1",
| "dataType": {
| "typeName": "__tempQueryResultStruct1",
| "attributeDefinitions": [
| {
| "name": "_col_0",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "_col_1",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| }
| ]
| },
| "rows": [
| {
| "$typeName$": "__tempQueryResultStruct1",
| "_col_1": "Jane BI",
| "_col_0": "Reporting"
| }
| ]
|}""".stripMargin);
}
test("testIsTrait") {
val r = QueryProcessor.evaluate(_class("Table").where(isTrait("Dimension")), g)
val r = QueryProcessor.evaluate(_class("Table").where(isTrait("Dimension")), g, gp)
validateJson(r, """{
| "query":"Table where Table is Dimension",
| "dataType":{
......@@ -107,7 +397,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| {
| "name":"sd",
| "dataTypeName":"StorageDesc",
| "dataTypeName":"StorageDescriptor",
| "multiplicity":{
| "lower":1,
| "upper":1,
......@@ -142,7 +432,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -164,7 +454,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -186,7 +476,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -205,7 +495,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
}
test("testhasField") {
val r = QueryProcessor.evaluate(_class("DB").where(hasField("name")), g)
val r = QueryProcessor.evaluate(_class("DB").where(hasField("name")), g, gp)
validateJson(r, """{
| "query":"DB where DB has name",
| "dataType":{
......@@ -253,6 +543,19 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| "isUnique":false,
| "isIndexable":true,
| "reverseAttributeName":null
| },
| {
| "name":"clusterName",
| "dataTypeName":"string",
| "multiplicity":{
| "lower":0,
| "upper":1,
| "isUnique":false
| },
| "isComposite":false,
| "isUnique":false,
| "isIndexable":true,
| "reverseAttributeName":null
| }
| ]
| },
......@@ -265,7 +568,8 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "owner":"John ETL",
| "name":"Sales",
| "createTime":1000
| "createTime":1000,
| "clusterName":"test"
| },
| {
| "$typeName$":"DB",
......@@ -275,14 +579,15 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "owner":"Jane BI",
| "name":"Reporting",
| "createTime":1500
| "createTime":1500,
| "clusterName":"test"
| }
| ]
|}""".stripMargin)
}
test("testFieldReference") {
val r = QueryProcessor.evaluate(_class("DB").field("Table"), g)
val r = QueryProcessor.evaluate(_class("DB").field("Table"), g, gp)
validateJson(r, """{
| "query":"DB Table",
| "dataType":{
......@@ -318,7 +623,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| {
| "name":"sd",
| "dataTypeName":"StorageDesc",
| "dataTypeName":"StorageDescriptor",
| "multiplicity":{
| "lower":1,
| "upper":1,
......@@ -353,7 +658,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -370,7 +675,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -392,7 +697,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -414,7 +719,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -436,7 +741,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -453,7 +758,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -468,27 +773,103 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
test("testBackReference") {
val r = QueryProcessor.evaluate(
_class("DB").as("db").field("Table").where(id("db").field("name").`=`(string("Reporting"))), g)
_class("DB").as("db").field("Table").where(id("db").field("name").`=`(string("Reporting"))), g, gp)
validateJson(r, null)
}
test("testArith") {
val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting"))).
select(id("name"), id("createTime") + int(1)), g)
select(id("name"), id("createTime") + int(1)), g, gp)
validateJson(r, "{\n \"query\":\"DB where (name = \\\"Reporting\\\") as _src1 select _src1.name as _col_0, (_src1.createTime + 1) as _col_1\",\n \"dataType\":{\n \"typeName\":\"__tempQueryResultStruct3\",\n \"attributeDefinitions\":[\n {\n \"name\":\"_col_0\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"_col_1\",\n \"dataTypeName\":\"int\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"__tempQueryResultStruct3\",\n \"_col_1\":1501,\n \"_col_0\":\"Reporting\"\n }\n ]\n}")
}
test("testComparisonLogical") {
val r = QueryProcessor.evaluate(_class("DB").where(id("name").`=`(string("Reporting")).
and(id("createTime") > int(0))), g)
validateJson(r, "{\n \"query\":\"DB where (name = \\\"Reporting\\\") and (createTime > 0)\",\n \"dataType\":{\n \"superTypes\":[\n \n ],\n \"hierarchicalMetaTypeName\":\"org.apache.atlas.typesystem.types.ClassType\",\n \"typeName\":\"DB\",\n \"attributeDefinitions\":[\n {\n \"name\":\"name\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"owner\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"createTime\",\n \"dataTypeName\":\"int\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"DB\",\n \"$id$\":{\n \"id\":\"7424\",\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"owner\":\"Jane BI\",\n \"name\":\"Reporting\",\n \"createTime\":1500\n }\n ]\n}")
and(id("createTime") > int(0))), g, gp)
validateJson(r, """{
| "query": "DB where (name = \"Reporting\") and (createTime > 0)",
| "dataType": {
| "superTypes": [
|
| ],
| "hierarchicalMetaTypeName": "org.apache.atlas.typesystem.types.ClassType",
| "typeName": "DB",
| "attributeDefinitions": [
| {
| "name": "name",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "owner",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "createTime",
| "dataTypeName": "int",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| },
| {
| "name": "clusterName",
| "dataTypeName": "string",
| "multiplicity": {
| "lower": 0,
| "upper": 1,
| "isUnique": false
| },
| "isComposite": false,
| "isUnique": false,
| "isIndexable": true,
| "reverseAttributeName": null
| }
| ]
| },
| "rows": [
| {
| "$typeName$": "DB",
| "$id$": {
| "$typeName$": "DB",
| "version": 0
| },
| "owner": "Jane BI",
| "name": "Reporting",
| "createTime": 1500,
| "clusterName": "test"
| }
| ]
|}""".stripMargin);
}
test("testJoinAndSelect1") {
val r = QueryProcessor.evaluate(
_class("DB").as("db1").where(id("name").`=`(string("Sales"))).field("Table").as("tab").
where((isTrait("Dimension"))).
select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g
select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g, gp
)
validateJson(r, "{\n \"query\":\"DB as db1 where (name = \\\"Sales\\\") Table as tab where DB as db1 where (name = \\\"Sales\\\") Table as tab is Dimension as _src1 select db1.name as dbName, tab.name as tabName\",\n \"dataType\":{\n \"typeName\":\"__tempQueryResultStruct5\",\n \"attributeDefinitions\":[\n {\n \"name\":\"dbName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"tabName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"__tempQueryResultStruct5\",\n \"dbName\":\"Sales\",\n \"tabName\":\"product_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct5\",\n \"dbName\":\"Sales\",\n \"tabName\":\"time_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct5\",\n \"dbName\":\"Sales\",\n \"tabName\":\"customer_dim\"\n }\n ]\n}")
}
......@@ -497,7 +878,7 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
val r = QueryProcessor.evaluate(
_class("DB").as("db1").where((id("db1").field("createTime") > int(0))
.or(id("name").`=`(string("Reporting")))).field("Table").as("tab")
.select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g
.select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g, gp
)
validateJson(r, "{\n \"query\":\"DB as db1 where (db1.createTime > 0) or (name = \\\"Reporting\\\") Table as tab select db1.name as dbName, tab.name as tabName\",\n \"dataType\":{\n \"typeName\":\"__tempQueryResultStruct6\",\n \"attributeDefinitions\":[\n {\n \"name\":\"dbName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"tabName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"__tempQueryResultStruct6\",\n \"dbName\":\"Sales\",\n \"tabName\":\"sales_fact\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct6\",\n \"dbName\":\"Sales\",\n \"tabName\":\"product_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct6\",\n \"dbName\":\"Sales\",\n \"tabName\":\"time_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct6\",\n \"dbName\":\"Sales\",\n \"tabName\":\"customer_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct6\",\n \"dbName\":\"Reporting\",\n \"tabName\":\"sales_fact_daily_mv\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct6\",\n \"dbName\":\"Reporting\",\n \"tabName\":\"sales_fact_monthly_mv\"\n }\n ]\n}")
}
......@@ -507,30 +888,176 @@ class GremlinTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
_class("DB").as("db1").where((id("db1").field("createTime") > int(0))
.and(id("db1").field("name").`=`(string("Reporting")))
.or(id("db1").hasField("owner"))).field("Table").as("tab")
.select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g
.select(id("db1").field("name").as("dbName"), id("tab").field("name").as("tabName")), g, gp
)
validateJson(r, "{\n \"query\":\"DB as db1 where (db1.createTime > 0) and (db1.name = \\\"Reporting\\\") or DB as db1 has owner Table as tab select db1.name as dbName, tab.name as tabName\",\n \"dataType\":{\n \"typeName\":\"__tempQueryResultStruct7\",\n \"attributeDefinitions\":[\n {\n \"name\":\"dbName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"tabName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Sales\",\n \"tabName\":\"sales_fact\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Sales\",\n \"tabName\":\"product_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Sales\",\n \"tabName\":\"time_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Sales\",\n \"tabName\":\"customer_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Reporting\",\n \"tabName\":\"sales_fact_daily_mv\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Reporting\",\n \"tabName\":\"sales_fact_monthly_mv\"\n }\n ]\n}")
validateJson(r, "{\n \"query\":\"DB as db1 where (db1.createTime > 0) and (db1.name = \\\"Reporting\\\") or db1 has owner Table as tab select db1.name as dbName, tab.name as tabName\",\n \"dataType\":{\n \"typeName\":\"__tempQueryResultStruct7\",\n \"attributeDefinitions\":[\n {\n \"name\":\"dbName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"tabName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Sales\",\n \"tabName\":\"sales_fact\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Sales\",\n \"tabName\":\"product_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Sales\",\n \"tabName\":\"time_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Sales\",\n \"tabName\":\"customer_dim\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Reporting\",\n \"tabName\":\"sales_fact_daily_mv\"\n },\n {\n \"$typeName$\":\"__tempQueryResultStruct7\",\n \"dbName\":\"Reporting\",\n \"tabName\":\"sales_fact_monthly_mv\"\n }\n ]\n}")
}
test("testJoinAndSelect4") {
val r = QueryProcessor.evaluate(
_class("DB").as("db1").where(id("name").`=`(string("Sales"))).field("Table").as("tab").
where((isTrait("Dimension"))).
select(id("db1").as("dbO"), id("tab").field("name").as("tabName")), g
select(id("db1").as("dbO"), id("tab").field("name").as("tabName")), g, gp
)
validateJson(r, "{\n \"query\":\"DB as db1 where (name = \\\"Sales\\\") Table as tab where DB as db1 where (name = \\\"Sales\\\") Table as tab is Dimension as _src1 select db1 as dbO, tab.name as tabName\",\n \"dataType\":{\n \"typeName\":\"\",\n \"attributeDefinitions\":[\n {\n \"name\":\"dbO\",\n \"dataTypeName\":\"DB\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"tabName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"\",\n \"dbO\":{\n \"id\":\"256\",\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"tabName\":\"product_dim\"\n },\n {\n \"$typeName$\":\"\",\n \"dbO\":{\n \"id\":\"256\",\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"tabName\":\"time_dim\"\n },\n {\n \"$typeName$\":\"\",\n \"dbO\":{\n \"id\":\"256\",\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"tabName\":\"customer_dim\"\n }\n ]\n}")
validateJson(r, "{\n \"query\":\"DB as db1 where (name = \\\"Sales\\\") Table as tab where DB as db1 where (name = \\\"Sales\\\") Table as tab is Dimension as _src1 select db1 as dbO, tab.name as tabName\",\n \"dataType\":{\n \"typeName\":\"\",\n \"attributeDefinitions\":[\n {\n \"name\":\"dbO\",\n \"dataTypeName\":\"DB\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n },\n {\n \"name\":\"tabName\",\n \"dataTypeName\":\"string\",\n \"multiplicity\":{\n \"lower\":0,\n \"upper\":1,\n \"isUnique\":false\n },\n \"isComposite\":false,\n \"isUnique\":false,\n \"isIndexable\":true,\n \"reverseAttributeName\":null\n }\n ]\n },\n \"rows\":[\n {\n \"$typeName$\":\"\",\n \"dbO\":{\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"tabName\":\"product_dim\"\n },\n {\n \"$typeName$\":\"\",\n \"dbO\":{\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"tabName\":\"time_dim\"\n },\n {\n \"$typeName$\":\"\",\n \"dbO\":{\n \"$typeName$\":\"DB\",\n \"version\":0\n },\n \"tabName\":\"customer_dim\"\n }\n ]\n}")
}
test("testArrayComparision") {
val p = new QueryParser
val e = p("Partition as p where values = ['2015-01-01']," +
" table where name = 'sales_fact_daily_mv'," +
" db where name = 'Reporting' and clusterName = 'test' select p").right.get
val r = QueryProcessor.evaluate(e, g, gp)
validateJson(r, """{
| "query":"Partition as p where (values = [\"2015-01-01\"]) table where (name = \"sales_fact_daily_mv\") db where (name = \"Reporting\") and (clusterName = \"test\") as _src1 select p as _col_0",
| "dataType":{
| "typeName":"__tempQueryResultStruct2",
| "attributeDefinitions":[
| {
| "name":"_col_0",
| "dataTypeName":"Partition",
| "multiplicity":{
| "lower":0,
| "upper":1,
| "isUnique":false
| },
| "isComposite":false,
| "isUnique":false,
| "isIndexable":true,
| "reverseAttributeName":null
| }
| ]
| },
| "rows":[
| {
| "$typeName$":"__tempQueryResultStruct2",
| "_col_0":{
| "$typeName$":"Partition",
| "version":0
| }
| }
| ]
|}""".stripMargin)
}
test("testArrayComparisionWithSelectOnArray") {
val p = new QueryParser
val e = p("Partition as p where values = ['2015-01-01']," +
" table where name = 'sales_fact_daily_mv'," +
" db where name = 'Reporting' and clusterName = 'test' select p.values").right.get
val r = QueryProcessor.evaluate(e, g, gp)
validateJson(r,
"""{
| "query":"Partition as p where (values = [\"2015-01-01\"]) table where (name = \"sales_fact_daily_mv\") db where (name = \"Reporting\") and (clusterName = \"test\") as _src1 select p.values as _col_0",
| "dataType":{
| "typeName":"__tempQueryResultStruct2",
| "attributeDefinitions":[
| {
| "name":"_col_0",
| "dataTypeName":"array<string>",
| "multiplicity":{
| "lower":0,
| "upper":1,
| "isUnique":false
| },
| "isComposite":false,
| "isUnique":false,
| "isIndexable":true,
| "reverseAttributeName":null
| }
| ]
| },
| "rows":[
| {
| "$typeName$":"__tempQueryResultStruct2",
| "_col_0":[
| "2015-01-01"
| ]
| }
| ]
|}
""".stripMargin)
}
test("testArrayInWhereClause") {
val p = new QueryParser
val e = p("Partition as p where values = ['2015-01-01']").right.get
val r = QueryProcessor.evaluate(e, g, gp)
validateJson(r, """{
| "query":"Partition as p where (values = [\"2015-01-01\"])",
| "dataType":{
| "superTypes":[
|
| ],
| "hierarchicalMetaTypeName":"org.apache.atlas.typesystem.types.ClassType",
| "typeName":"Partition",
| "attributeDefinitions":[
| {
| "name":"values",
| "dataTypeName":"array<string>",
| "multiplicity":{
| "lower":1,
| "upper":1,
| "isUnique":false
| },
| "isComposite":false,
| "isUnique":false,
| "isIndexable":true,
| "reverseAttributeName":null
| },
| {
| "name":"table",
| "dataTypeName":"Table",
| "multiplicity":{
| "lower":1,
| "upper":1,
| "isUnique":false
| },
| "isComposite":false,
| "isUnique":false,
| "isIndexable":true,
| "reverseAttributeName":null
| }
| ]
| },
| "rows":[
| {
| "$typeName$":"Partition",
| "$id$":{
| "$typeName$":"Partition",
| "version":0
| },
| "values":[
| "2015-01-01"
| ],
| "table":{
| "$typeName$":"Table",
| "version":0
| }
| }
| ]
|}""".stripMargin)
}
test("testArrayWithStruct") {
// val p = new QueryParser
// val e = p("from LoadProcess select inputTables").right.get
// val r = QueryProcessor.evaluate(e, g)
val r = QueryProcessor.evaluate(_class("LoadProcess").field("inputTables"), g, gp)
validateJson(r)
}
test("testNegativeInvalidType") {
val p = new QueryParser
val e = p("from blah").right.get
an [ExpressionException] should be thrownBy QueryProcessor.evaluate(e, g)
an[ExpressionException] should be thrownBy QueryProcessor.evaluate(e, g, gp)
}
test("Bug37860") {
test("testJoinAndSelect5") {
val p = new QueryParser
val e = p("Table as t where name = 'sales_fact' db where name = 'Sales' and owner = 'John ETL' select t").right.get
val r = QueryProcessor.evaluate(e, g)
val r = QueryProcessor.evaluate(e, g, gp)
validateJson(r)
}
}
......@@ -19,7 +19,10 @@
package org.apache.atlas.query
import com.thinkaurelius.titan.core.TitanGraph
import com.thinkaurelius.titan.core.util.TitanCleanup
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy
import org.apache.atlas.query.Expressions._
import org.apache.atlas.repository.graph.{TitanGraphProvider, GraphBackedMetadataRepository}
import org.apache.atlas.typesystem.types.TypeSystem
import org.junit.runner.RunWith
import org.scalatest._
......@@ -29,15 +32,25 @@ import org.scalatest.junit.JUnitRunner
class GremlinTest2 extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
var g: TitanGraph = null
var gProvider:TitanGraphProvider = null;
var gp:GraphPersistenceStrategies = null;
override def beforeAll() {
TypeSystem.getInstance().reset()
QueryTestsUtils.setupTypes
g = QueryTestsUtils.setupTestGraph
gProvider = new TitanGraphProvider();
gp = new DefaultGraphPersistenceStrategy(new GraphBackedMetadataRepository(gProvider))
g = QueryTestsUtils.setupTestGraph(gProvider)
}
override def afterAll() {
g.shutdown()
try {
TitanCleanup.clear(g);
} catch {
case ex: Exception =>
print("Could not clear the graph ", ex);
}
}
test("testTraitSelect") {
......
......@@ -24,7 +24,8 @@ import java.util.{Date, UUID}
import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
import com.thinkaurelius.titan.core.TitanGraph
import com.typesafe.config.ConfigFactory
import org.apache.atlas.repository.Constants
import org.apache.atlas.repository.graph.TitanGraphProvider
import org.apache.atlas.TestUtils
import org.apache.commons.io.FileUtils
......@@ -39,33 +40,41 @@ object HiveTitanSample {
val _id: String
def id = _id
val version = 0
val guid = s"""${UUID.randomUUID()}""".stripMargin
val __version = 0
val __guid = s"""${UUID.randomUUID()}""".stripMargin
def addEdge(to: Vertex, label: String, edges: ArrayBuffer[String]): Unit = {
def addEdge(to: Vertex, label: String, edges: ArrayBuffer[String]): Int = {
val edgeId = nextEdgeId.incrementAndGet();
edges +=
s"""{"_id" : "${nextEdgeId.incrementAndGet()}", "_type" : "edge", "_inV" : "${to.id}", "_outV" : "$id", "_label" : "$label"}"""
s"""{"_id" : "${edgeId}", "_type" : "edge", "_inV" : "${to.id}", "_outV" : "$id", "_label" : "$label"}"""
edgeId
}
def toGSon(vertices: ArrayBuffer[String],
edges: ArrayBuffer[String]): Unit = {
val sb = new StringBuilder
sb.append( s"""{"typeName" : "${this.getClass.getSimpleName}", "_type" : "vertex"""")
sb.append( s"""{"${Constants.ENTITY_TYPE_PROPERTY_KEY}" : "${this.getClass.getSimpleName}", "_type" : "vertex"""")
this.getClass.getDeclaredFields filter (_.getName != "traits") foreach { f =>
f.setAccessible(true)
val fV = f.get(this)
val convertedVal = fV match {
case _: String => s""""$fV""""
case ls: List[_] if isPrimitiveType(ls) =>
s"""["${ls.mkString(",")}"]"""
case d: Date => d.getTime
case _ => fV
}
convertedVal match {
case x: Vertex => addEdge(x, s"${this.getClass.getSimpleName}.${f.getName}", edges)
case l: List[_] => l.foreach(x => addEdge(x.asInstanceOf[Vertex],
s"${this.getClass.getSimpleName}.${f.getName}", edges))
case x: Vertex => addEdge(x, s"__${this.getClass.getSimpleName}.${f.getName}", edges)
case l: List[_] => val edgeList = l.map(x =>
s""""${addEdge(x.asInstanceOf[Vertex], s"__${this.getClass.getSimpleName}.${f.getName}", edges)}""""
)
if(l.head.isInstanceOf[Struct]) {
sb.append( s""", "${this.getClass.getSimpleName}.${f.getName}" : ${edgeList.mkString("[", ",", "]")}""")
}
case _ => sb.append( s""", "${f.getName}" : $convertedVal""")
sb.append( s""", "${this.getClass.getSimpleName}.${f.getName}" : $convertedVal""")
}
......@@ -77,13 +86,29 @@ object HiveTitanSample {
if (traits.isDefined) {
val fV = traits.get.map(_.getClass.getSimpleName).mkString(",")
sb.append( s""", "traitNames" : "$fV"""")
sb.append( s""", "${Constants.TRAIT_NAMES_PROPERTY_KEY}" : "$fV"""")
}
}
sb.append("}")
vertices += sb.toString()
}
def isPrimitiveType(ls: List[_]) : Boolean = {
ls.head match {
case _: String => true
case _: Byte => true
case _: Short => true
case _: Int => true
case _: Long => true
case _: Float => true
case _: Double => true
case _: BigDecimal => true
case _: BigInt => true
case _: Boolean => true
case default => false
}
}
}
trait Trait extends Vertex
......@@ -118,12 +143,22 @@ object HiveTitanSample {
case class ETL(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
case class DB(name: String, owner: String, createTime: Int, traits: Option[List[Trait]] = None,
case class DB(name: String, owner: String, createTime: Int, clusterName: String, traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class StorageDescriptor(inputFormat: String, outputFormat: String,
case class HiveOrder(col: String, order: Int,
_id: String = "" + nextVertexId.incrementAndGet()) extends Struct
case class StorageDescriptor(inputFormat: String, outputFormat: String,
sortCols: List[Struct], _id: String = "" + nextVertexId.incrementAndGet()) extends Struct {
override def toGSon(vertices: ArrayBuffer[String],
edges: ArrayBuffer[String]): Unit = {
sortCols.foreach(_.toGSon(vertices, edges))
super.toGSon(vertices, edges)
}
}
case class Column(name: String, dataType: String, sd: StorageDescriptor,
traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
......@@ -133,7 +168,7 @@ object HiveTitanSample {
traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class TableDef(name: String, db: DB, inputFormat: String, outputFormat: String,
case class TableDef(name: String, db: DB, sd: StorageDescriptor,
columns: List[(String, String, Option[List[Trait]])],
traits: Option[List[Trait]] = None,
created: Option[Date] = None) {
......@@ -141,7 +176,7 @@ object HiveTitanSample {
case Some(x) => x
case None => new Date(TestUtils.TEST_DATE_IN_LONG)
}
val sd = StorageDescriptor(inputFormat, outputFormat)
val colDefs = columns map { c =>
Column(c._1, c._2, sd, c._3)
}
......@@ -157,6 +192,9 @@ object HiveTitanSample {
}
}
case class Partition(values: List[String], table: Table, traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
case class LoadProcess(name: String, inputTables: List[Vertex],
outputTable: Vertex,
traits: Option[List[Trait]] = None,
......@@ -166,11 +204,11 @@ object HiveTitanSample {
traits: Option[List[Trait]] = None,
_id: String = "" + nextVertexId.incrementAndGet()) extends Instance
val salesDB = DB("Sales", "John ETL", 1000)
val salesDB = DB("Sales", "John ETL", 1000, "test")
val salesFact = TableDef("sales_fact",
salesDB,
"TextInputFormat",
"TextOutputFormat",
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("customer_id", 0))),
List(
("time_id", "int", None),
("product_id", "int", None),
......@@ -180,8 +218,8 @@ object HiveTitanSample {
))
val productDim = TableDef("product_dim",
salesDB,
"TextInputFormat",
"TextOutputFormat",
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("product_id", 0))),
List(
("product_id", "int", None),
("product_name", "string", None),
......@@ -190,8 +228,8 @@ object HiveTitanSample {
Some(List(Dimension())))
val timeDim = TableDef("time_dim",
salesDB,
"TextInputFormat",
"TextOutputFormat",
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("time_id", 0))),
List(
("time_id", "int", None),
("dayOfYear", "int", None),
......@@ -200,8 +238,8 @@ object HiveTitanSample {
Some(List(Dimension())))
val customerDim = TableDef("customer_dim",
salesDB,
"TextInputFormat",
"TextOutputFormat",
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("customer_id", 0))),
List(
("customer_id", "int", None),
("name", "int", None),
......@@ -209,11 +247,11 @@ object HiveTitanSample {
),
Some(List(Dimension())))
val reportingDB = DB("Reporting", "Jane BI", 1500)
val reportingDB = DB("Reporting", "Jane BI", 1500, "test")
val salesFactDaily = TableDef("sales_fact_daily_mv",
reportingDB,
"TextInputFormat",
"TextOutputFormat",
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("customer_id", 0))),
List(
("time_id", "int", None),
("product_id", "int", None),
......@@ -235,8 +273,8 @@ object HiveTitanSample {
val salesFactMonthly = TableDef("sales_fact_monthly_mv",
reportingDB,
"TextInputFormat",
"TextOutputFormat",
StorageDescriptor("TextInputFormat",
"TextOutputFormat", List(HiveOrder("customer_id", 0))),
List(
("time_id", "int", None),
("product_id", "int", None),
......@@ -247,6 +285,8 @@ object HiveTitanSample {
List(salesFactDaily.tablDef), salesFactMonthly.tablDef,
Some(List(ETL())))
val salesDailyPartition = Partition(List("2015-01-01"),salesFactDaily.tablDef)
val vertices: ArrayBuffer[String] = new ArrayBuffer[String]()
val edges: ArrayBuffer[String] = new ArrayBuffer[String]()
......@@ -264,6 +304,7 @@ object HiveTitanSample {
customerDimView.toGSon(vertices, edges)
salesFactMonthly.toGSon(vertices, edges)
loadSalesFactMonthly.toGSon(vertices, edges)
salesDailyPartition.toGSon(vertices, edges)
def toGSon(): String = {
s"""{
......@@ -328,9 +369,7 @@ object HiveTitanSample {
object TestApp extends App with GraphUtils {
var conf = ConfigFactory.load()
conf = conf.getConfig("graphRepo")
val g: TitanGraph = titanGraph(conf)
val g: TitanGraph = TitanGraphProvider.getGraphInstance
val manager: ScriptEngineManager = new ScriptEngineManager
val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy")
val bindings: Bindings = engine.createBindings
......
......@@ -19,7 +19,10 @@
package org.apache.atlas.query
import com.thinkaurelius.titan.core.TitanGraph
import com.thinkaurelius.titan.core.util.TitanCleanup
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy
import org.apache.atlas.query.Expressions._
import org.apache.atlas.repository.graph.{GraphBackedMetadataRepository, TitanGraphProvider}
import org.apache.atlas.typesystem.types.TypeSystem
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
......@@ -29,21 +32,31 @@ import org.scalatest.{Assertions, BeforeAndAfterAll, FunSuite}
class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinTest {
var g: TitanGraph = null
var gProvider:TitanGraphProvider = null;
var gp:GraphPersistenceStrategies = null;
override def beforeAll() {
TypeSystem.getInstance().reset()
QueryTestsUtils.setupTypes
g = QueryTestsUtils.setupTestGraph
gProvider = new TitanGraphProvider();
gp = new DefaultGraphPersistenceStrategy(new GraphBackedMetadataRepository(gProvider))
g = QueryTestsUtils.setupTestGraph(gProvider)
}
override def afterAll() {
g.shutdown()
try {
TitanCleanup.clear(g);
} catch {
case ex: Exception =>
print("Could not clear the graph ", ex);
}
}
val PREFIX_SPACES_REGEX = ("\\n\\s*").r
test("testInputTables") {
val r = QueryProcessor.evaluate(_class("LoadProcess").field("inputTables"), g)
val r = QueryProcessor.evaluate(_class("LoadProcess").field("inputTables"), g, gp)
val x = r.toJson
validateJson(r,"""{
| "query":"LoadProcess inputTables",
......@@ -82,7 +95,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| {
| "name":"sd",
| "dataTypeName":"StorageDesc",
| "dataTypeName":"StorageDescriptor",
| "multiplicity":{
| "lower":1,
| "upper":1,
......@@ -117,7 +130,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -134,7 +147,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -156,7 +169,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -171,12 +184,12 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
}
test("testLoadProcessOut") {
val r = QueryProcessor.evaluate(_class("Table").field("LoadProcess").field("outputTable"), g)
val r = QueryProcessor.evaluate(_class("Table").field("LoadProcess").field("outputTable"), g, gp)
validateJson(r, null)
}
test("testLineageAll") {
val r = QueryProcessor.evaluate(_class("Table").loop(id("LoadProcess").field("outputTable")), g)
val r = QueryProcessor.evaluate(_class("Table").loop(id("LoadProcess").field("outputTable")), g, gp)
validateJson(r, """{
| "query":"Table as _loop0 loop (LoadProcess outputTable)",
| "dataType":{
......@@ -214,7 +227,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| {
| "name":"sd",
| "dataTypeName":"StorageDesc",
| "dataTypeName":"StorageDescriptor",
| "multiplicity":{
| "lower":1,
| "upper":1,
......@@ -244,13 +257,12 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| {
| "$typeName$":"Table",
| "$id$":{
| "id":"9216",
| "$typeName$":"Table",
| "version":0
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -267,7 +279,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -284,7 +296,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -301,7 +313,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -318,7 +330,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -333,7 +345,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
test("testLineageAllSelect") {
val r = QueryProcessor.evaluate(_class("Table").as("src").loop(id("LoadProcess").field("outputTable")).as("dest").
select(id("src").field("name").as("srcTable"), id("dest").field("name").as("destTable")), g)
select(id("src").field("name").as("srcTable"), id("dest").field("name").as("destTable")), g, gp)
validateJson(r, """{
"query":"Table as src loop (LoadProcess outputTable) as dest select src.name as srcTable, dest.name as destTable",
"dataType":{
......@@ -398,7 +410,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
}
test("testLineageFixedDepth") {
val r = QueryProcessor.evaluate(_class("Table").loop(id("LoadProcess").field("outputTable"), int(1)), g)
val r = QueryProcessor.evaluate(_class("Table").loop(id("LoadProcess").field("outputTable"), int(1)), g, gp)
validateJson(r, """{
| "query":"Table as _loop0 loop (LoadProcess outputTable) times 1",
| "dataType":{
......@@ -436,7 +448,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| {
| "name":"sd",
| "dataTypeName":"StorageDesc",
| "dataTypeName":"StorageDescriptor",
| "multiplicity":{
| "lower":1,
| "upper":1,
......@@ -471,7 +483,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -488,7 +500,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......@@ -505,7 +517,7 @@ class LineageQueryTest extends FunSuite with BeforeAndAfterAll with BaseGremlinT
| },
| "created":"2014-12-11T02:35:58.440Z",
| "sd":{
| "$typeName$":"StorageDesc",
| "$typeName$":"StorageDescriptor",
| "version":0
| },
| "db":{
......
......@@ -27,9 +27,7 @@ class ParserTest extends BaseTest {
@Before
override def setup {
super.setup
QueryTestsUtils.setupTypes
}
@Test def testFrom: Unit = {
......@@ -98,4 +96,13 @@ class ParserTest extends BaseTest {
)
}
@Test def testList: Unit = {
val p = new QueryParser
println(p(
"Partition as p where values = ['2015-01-01']," +
" table where name = 'tableoq8ty'," +
" db where name = 'default' and clusterName = 'test'").right.get.toString
)
}
}
......@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList
import com.thinkaurelius.titan.core.{TitanFactory, TitanGraph}
import com.tinkerpop.blueprints.Vertex
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.atlas.repository.graph.TitanGraphProvider
import org.apache.atlas.typesystem.types._
import org.apache.commons.configuration.{Configuration, ConfigurationException, MapConfiguration}
import org.apache.commons.io.FileUtils
......@@ -49,9 +50,9 @@ trait GraphUtils {
}
def titanGraph(conf: Config) = {
def titanGraph(conf: Configuration) = {
try {
val g = TitanFactory.open(getConfiguration(conf))
val g = TitanFactory.open(conf)
val mgmt = g.getManagementSystem
val typname = mgmt.makePropertyKey("typeName").dataType(classOf[String]).make()
mgmt.buildIndex("byTypeName", classOf[Vertex]).addKey(typname).buildCompositeIndex()
......@@ -79,30 +80,44 @@ object QueryTestsUtils extends GraphUtils {
Array(
attrDef("name", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.INT_TYPE)
attrDef("createTime", DataTypes.INT_TYPE),
attrDef("clusterName", DataTypes.STRING_TYPE)
))
def storageDescClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "StorageDesc", null,
def hiveOrderDef = new StructTypeDefinition("HiveOrder",
Array(
attrDef("col", DataTypes.STRING_TYPE),
attrDef("order", DataTypes.INT_TYPE)
))
def storageDescClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "StorageDescriptor", null,
Array(
attrDef("inputFormat", DataTypes.STRING_TYPE),
attrDef("outputFormat", DataTypes.STRING_TYPE)
attrDef("outputFormat", DataTypes.STRING_TYPE),
new AttributeDefinition("sortCols", DataTypes.arrayTypeName("HiveOrder"), Multiplicity.REQUIRED, false, null)
))
def columnClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "Column", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
attrDef("dataType", DataTypes.STRING_TYPE),
new AttributeDefinition("sd", "StorageDesc", Multiplicity.REQUIRED, false, null)
new AttributeDefinition("sd", "StorageDescriptor", Multiplicity.REQUIRED, false, null)
))
def tblClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "Table", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("db", "DB", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", "StorageDesc", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", "StorageDescriptor", Multiplicity.REQUIRED, false, null),
attrDef("created", DataTypes.DATE_TYPE)
))
def partitionClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "Partition", null,
Array(
new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("table", "Table", Multiplicity.REQUIRED, false, null)
))
def loadProcessClsDef = new HierarchicalTypeDefinition[ClassType](classOf[ClassType], "LoadProcess", null,
Array(
attrDef("name", DataTypes.STRING_TYPE),
......@@ -128,19 +143,17 @@ object QueryTestsUtils extends GraphUtils {
Array[AttributeDefinition]())
TypeSystem.getInstance().defineTypes(ImmutableList.of[EnumTypeDefinition],
ImmutableList.of[StructTypeDefinition],
ImmutableList.of[StructTypeDefinition](hiveOrderDef),
ImmutableList.of[HierarchicalTypeDefinition[TraitType]](dimTraitDef, piiTraitDef,
metricTraitDef, etlTraitDef, jdbcTraitDef),
ImmutableList.of[HierarchicalTypeDefinition[ClassType]](dbClsDef, storageDescClsDef, columnClsDef, tblClsDef,
loadProcessClsDef, viewClsDef))
partitionClsDef, loadProcessClsDef, viewClsDef))
()
}
def setupTestGraph: TitanGraph = {
var conf = ConfigFactory.load()
conf = conf.getConfig("graphRepo")
val g = titanGraph(conf)
def setupTestGraph(gp: TitanGraphProvider): TitanGraph = {
val g = gp.get
val manager: ScriptEngineManager = new ScriptEngineManager
val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy")
val bindings: Bindings = engine.createBindings
......
......@@ -48,9 +48,9 @@ public class DataTypes {
public static BigDecimalType BIGDECIMAL_TYPE = new BigDecimalType();
public static DateType DATE_TYPE = new DateType();
public static StringType STRING_TYPE = new StringType();
static String ARRAY_TYPE_PREFIX = "array<";
public static String ARRAY_TYPE_PREFIX = "array<";
static String ARRAY_TYPE_SUFFIX = ">";
static String MAP_TYPE_PREFIX = "map<";
public static String MAP_TYPE_PREFIX = "map<";
static String MAP_TYPE_SUFFIX = ">";
public static String arrayTypeName(String elemTypeName) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment