Commit 6369ab28 by Shwetha GS

added hive lineage test

parent 0dd414f2
......@@ -195,8 +195,8 @@ public class HiveMetaStoreBridge {
}
}
private String getTableName(String dbName, String tableName) {
return String.format("%s/%s.%s", clusterName, dbName.toLowerCase(), tableName.toLowerCase());
public static String getTableName(String clusterName, String dbName, String tableName) {
return String.format("%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), clusterName);
}
/**
......@@ -211,7 +211,7 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for table {}.{}", dbName, tableName);
String typeName = HiveDataTypes.HIVE_TABLE.getName();
String entityName = getTableName(dbName, tableName);
String entityName = getTableName(clusterName, dbName, tableName);
String dslQuery = String.format("%s as t where name = '%s'", typeName, entityName);
return getEntityReferenceFromDSL(typeName, dslQuery);
}
......@@ -239,7 +239,7 @@ public class HiveMetaStoreBridge {
// dbName, clusterName);
String datasetType = MetadataServiceClient.DATA_SET_SUPER_TYPE;
String tableEntityName = getTableName(dbName, tableName);
String tableEntityName = getTableName(clusterName, dbName, tableName);
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')."
+ "out('__%s.table').has('%s.name', '%s').back('p').toList()", typeName, typeName, valuesStr,
......@@ -274,7 +274,8 @@ public class HiveMetaStoreBridge {
Table hiveTable = hiveClient.getTable(dbName, tableName);
tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
tableRef.set(HiveDataModelGenerator.NAME, getTableName(hiveTable.getDbName(), hiveTable.getTableName()));
tableRef.set(HiveDataModelGenerator.NAME,
getTableName(clusterName, hiveTable.getDbName(), hiveTable.getTableName()));
tableRef.set(HiveDataModelGenerator.TABLE_NAME, hiveTable.getTableName().toLowerCase());
tableRef.set("owner", hiveTable.getOwner());
......@@ -506,6 +507,6 @@ public class HiveMetaStoreBridge {
client.updateEntity(tableReferenceable.getId()._getId(), HiveDataModelGenerator.TABLE_NAME,
newTable.getTableName().toLowerCase());
client.updateEntity(tableReferenceable.getId()._getId(), HiveDataModelGenerator.NAME,
getTableName(newTable.getDbName(), newTable.getTableName()));
getTableName(clusterName, newTable.getDbName(), newTable.getTableName()));
}
}
......@@ -141,6 +141,9 @@ public class HiveHookIT {
Referenceable tableRef = dgiCLient.getEntity(tableId);
Assert.assertEquals(tableRef.get("tableType"), TableType.MANAGED_TABLE.name());
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment");
String entityName = HiveMetaStoreBridge.getTableName(CLUSTER_NAME, DEFAULT_DB, tableName);
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), entityName);
final Id sdId = (Id) tableRef.get("sd");
Referenceable sdRef = dgiCLient.getEntity(sdId.id);
Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_IS_STORED_AS_SUB_DIRS),false);
......@@ -349,20 +352,28 @@ public class HiveHookIT {
}
}
@Test(enabled = false)
@Test
public void testLineage() throws Exception {
String table1 = createTable(false);
String db2 = createDatabase();
String table2 = tableName();
String db3 = createDatabase();
String table3 = tableName();
String query = String.format("create table %s.%s as select * from %s", db2, table2, table1);
runCommand(query);
query = String.format("create table %s.%s as select * from %s.%s", db3, table3, db2, table2);
runCommand(query);
String table1Id = assertTableIsRegistered(DEFAULT_DB, table1);
String table2Id = assertTableIsRegistered(db2, table2);
String datasetName = HiveMetaStoreBridge.getTableName(CLUSTER_NAME, db2, table2);
JSONObject response = dgiCLient.getInputGraph(datasetName);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
Assert.assertTrue(vertices.has(table1Id));
Assert.assertTrue(vertices.has(table2Id));
datasetName = HiveMetaStoreBridge.getTableName(CLUSTER_NAME, DEFAULT_DB, table1);
response = dgiCLient.getOutputGraph(datasetName);
vertices = response.getJSONObject("values").getJSONObject("vertices");
Assert.assertTrue(vertices.has(table1Id));
Assert.assertTrue(vertices.has(table2Id));
}
}
......@@ -64,7 +64,7 @@ public class MetadataServiceClient {
public static final String URI_ENTITIES = "entities";
public static final String URI_TRAITS = "traits";
public static final String URI_SEARCH = "discovery/search";
public static final String URI_LINEAGE = "lineage/hive";
public static final String URI_LINEAGE = "lineage/hive/table";
public static final String QUERY = "query";
public static final String QUERY_TYPE = "queryType";
......@@ -127,9 +127,12 @@ public class MetadataServiceClient {
SEARCH(BASE_URI + URI_SEARCH, HttpMethod.GET),
SEARCH_DSL(BASE_URI + URI_SEARCH + "/dsl", HttpMethod.GET),
SEARCH_GREMLIN(BASE_URI + URI_SEARCH + "/gremlin", HttpMethod.GET),
SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET);
SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET),
//Lineage operations
LINEAGE_INPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET),
LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET),
LINEAGE_SCHEMA(BASE_URI + URI_LINEAGE, HttpMethod.GET);
private final String method;
private final String path;
......@@ -148,6 +151,16 @@ public class MetadataServiceClient {
}
}
/**
* Register the given type(meta model)
* @param typeAsJson type definition a jaon
* @return result json object
* @throws MetadataServiceException
*/
public JSONObject createType(String typeAsJson) throws MetadataServiceException {
return callAPI(API.CREATE_TYPE, typeAsJson);
}
public List<String> listTypes() throws MetadataServiceException {
try {
final JSONObject jsonObject = callAPI(API.LIST_TYPES, null);
......@@ -179,16 +192,6 @@ public class MetadataServiceClient {
}
/**
* Register the given type(meta model)
* @param typeAsJson type definition a jaon
* @return result json object
* @throws MetadataServiceException
*/
public JSONObject createType(String typeAsJson) throws MetadataServiceException {
return callAPI(API.CREATE_TYPE, typeAsJson);
}
/**
* Create the given entity
* @param entityAsJson entity(type instance) as json
* @return result json object
......@@ -294,6 +297,24 @@ public class MetadataServiceClient {
return callAPIWithResource(API.SEARCH_FULL_TEXT, resource);
}
public JSONObject getInputGraph(String datasetName) throws MetadataServiceException {
JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph");
try {
return response.getJSONObject(MetadataServiceClient.RESULTS);
} catch (JSONException e) {
throw new MetadataServiceException(e);
}
}
public JSONObject getOutputGraph(String datasetName) throws MetadataServiceException {
JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph");
try {
return response.getJSONObject(MetadataServiceClient.RESULTS);
} catch (JSONException e) {
throw new MetadataServiceException(e);
}
}
public String getRequestId(JSONObject json) throws MetadataServiceException {
try {
return json.getString(REQUEST_ID);
......
......@@ -29,7 +29,7 @@ public interface ITypeStore {
* @param typeSystem type system to persist
* @throws StorageException
*/
public void store(TypeSystem typeSystem) throws MetadataException;
void store(TypeSystem typeSystem) throws MetadataException;
/**
* Persist the given type in the type system - insert or update
......@@ -37,12 +37,12 @@ public interface ITypeStore {
* @param types types to persist
* @throws StorageException
*/
public void store(TypeSystem typeSystem, ImmutableList<String> types) throws MetadataException;
void store(TypeSystem typeSystem, ImmutableList<String> types) throws MetadataException;
/**
* Restore all type definitions
* @return List of persisted type definitions
* @throws org.apache.hadoop.metadata.MetadataException
*/
public TypesDef restore() throws MetadataException;
TypesDef restore() throws MetadataException;
}
......@@ -40,11 +40,14 @@ import org.apache.hadoop.metadata.typesystem.json.TypesSerialization;
import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition;
import org.apache.hadoop.metadata.typesystem.types.ClassType;
import org.apache.hadoop.metadata.typesystem.types.DataTypes;
import org.apache.hadoop.metadata.typesystem.types.EnumTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.IDataType;
import org.apache.hadoop.metadata.typesystem.types.Multiplicity;
import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.TraitType;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.apache.hadoop.metadata.typesystem.types.TypeUtils;
import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
......@@ -114,32 +117,28 @@ public class DefaultMetadataService implements MetadataService {
return; // this is already registered
}
Map<String, IDataType> superTypes = new HashMap();
HierarchicalTypeDefinition<ClassType> superTypeDefinition =
HierarchicalTypeDefinition<ClassType> infraType =
TypesUtil.createClassTypeDef(MetadataServiceClient.INFRASTRUCTURE_SUPER_TYPE,
ImmutableList.<String>of(), NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
superTypes.put(MetadataServiceClient.INFRASTRUCTURE_SUPER_TYPE, typeSystem.defineClassType
(superTypeDefinition));
superTypeDefinition =
TypesUtil.createClassTypeDef(MetadataServiceClient.DATA_SET_SUPER_TYPE,
ImmutableList.<String>of(),
HierarchicalTypeDefinition<ClassType> datasetType = TypesUtil
.createClassTypeDef(MetadataServiceClient.DATA_SET_SUPER_TYPE, ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
superTypes.put(MetadataServiceClient.DATA_SET_SUPER_TYPE, typeSystem.defineClassType(superTypeDefinition));
superTypeDefinition =
TypesUtil.createClassTypeDef(MetadataServiceClient.PROCESS_SUPER_TYPE,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE,
new AttributeDefinition("inputs",
HierarchicalTypeDefinition<ClassType> processType = TypesUtil
.createClassTypeDef(MetadataServiceClient.PROCESS_SUPER_TYPE, ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE, new AttributeDefinition("inputs",
DataTypes.arrayTypeName(MetadataServiceClient.DATA_SET_SUPER_TYPE),
new Multiplicity(0, Integer.MAX_VALUE, false), false, null),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("outputs",
DataTypes.arrayTypeName(MetadataServiceClient.DATA_SET_SUPER_TYPE),
new Multiplicity(0, Integer.MAX_VALUE, false), false, null)
);
superTypes.put(MetadataServiceClient.PROCESS_SUPER_TYPE, typeSystem.defineClassType(superTypeDefinition));
onTypesAddedToRepo(superTypes);
Multiplicity.OPTIONAL, false, null));
TypesDef typesDef = TypeUtils
.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.of(infraType, datasetType, processType));
createType(TypesSerialization.toJson(typesDef));
}
/**
......
......@@ -20,18 +20,18 @@ package org.apache.hadoop.metadata.query
import com.thinkaurelius.titan.core.TitanGraph
import org.apache.hadoop.metadata.query.Expressions._
import org.slf4j.{LoggerFactory, Logger}
object QueryProcessor {
val LOG : Logger = LoggerFactory.getLogger("org.apache.hadoop.metadata.query.QueryProcessor")
def evaluate(e: Expression, g: TitanGraph, gP : GraphPersistenceStrategies = GraphPersistenceStrategy1):
GremlinQueryResult = {
val e1 = validate(e)
val q = new GremlinTranslator(e1, gP).translate()
// println("---------------------")
// println("Query: " + e1)
// println("Expression Tree:\n" + e1.treeString)
// println("Gremlin Query: " + q.queryStr)
// println("---------------------")
LOG.debug("Query: " + e1)
LOG.debug("Expression Tree:\n" + e1.treeString)
LOG.debug("Gremlin Query: " + q.queryStr)
new GremlinEvaluator(q, gP, g).evaluate()
}
......
......@@ -99,13 +99,17 @@ public class GraphBackedTypeStoreTest {
List<StructTypeDefinition> structTypes = types.structTypesAsJavaList();
Assert.assertEquals(1, structTypes.size());
boolean clsTypeFound = false;
List<HierarchicalTypeDefinition<ClassType>> classTypes = types.classTypesAsJavaList();
Assert.assertEquals(3, classTypes.size());
for (HierarchicalTypeDefinition<ClassType> classType : classTypes) {
ClassType expectedType = ts.getDataType(ClassType.class, classType.typeName);
Assert.assertEquals(expectedType.immediateAttrs.size(), classType.attributeDefinitions.length);
Assert.assertEquals(expectedType.superTypes.size(), classType.superTypes.size());
if (classType.typeName.equals("Manager")) {
ClassType expectedType = ts.getDataType(ClassType.class, classType.typeName);
Assert.assertEquals(expectedType.immediateAttrs.size(), classType.attributeDefinitions.length);
Assert.assertEquals(expectedType.superTypes.size(), classType.superTypes.size());
clsTypeFound = true;
}
}
Assert.assertTrue("Manager type not restored", clsTypeFound);
//validate trait
List<HierarchicalTypeDefinition<TraitType>> traitTypes = types.traitTypesAsJavaList();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment