Commit b65dd91c by Shwetha GS

ATLAS-713 Entity lineage based on entity id (shwethags)

parent 857561a3
...@@ -90,7 +90,8 @@ public class AtlasClient { ...@@ -90,7 +90,8 @@ public class AtlasClient {
public static final String URI_ENTITY = "entities"; public static final String URI_ENTITY = "entities";
public static final String URI_ENTITY_AUDIT = "audit"; public static final String URI_ENTITY_AUDIT = "audit";
public static final String URI_SEARCH = "discovery/search"; public static final String URI_SEARCH = "discovery/search";
public static final String URI_LINEAGE = "lineage/hive/table"; public static final String URI_NAME_LINEAGE = "lineage/hive/table";
public static final String URI_LINEAGE = "lineage/";
public static final String URI_TRAITS = "traits"; public static final String URI_TRAITS = "traits";
public static final String QUERY = "query"; public static final String QUERY = "query";
...@@ -416,7 +417,12 @@ public class AtlasClient { ...@@ -416,7 +417,12 @@ public class AtlasClient {
SEARCH_GREMLIN(BASE_URI + URI_SEARCH + "/gremlin", HttpMethod.GET, Response.Status.OK), SEARCH_GREMLIN(BASE_URI + URI_SEARCH + "/gremlin", HttpMethod.GET, Response.Status.OK),
SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET, Response.Status.OK), SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET, Response.Status.OK),
//Lineage operations //Lineage operations based on dataset name
NAME_LINEAGE_INPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
NAME_LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
NAME_LINEAGE_SCHEMA(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
//Lineage operations based on entity id of the dataset
LINEAGE_INPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK), LINEAGE_INPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK),
LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK), LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK),
LINEAGE_SCHEMA(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK); LINEAGE_SCHEMA(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK);
...@@ -988,7 +994,7 @@ public class AtlasClient { ...@@ -988,7 +994,7 @@ public class AtlasClient {
} }
public JSONObject getInputGraph(String datasetName) throws AtlasServiceException { public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph"); JSONObject response = callAPI(API.NAME_LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph");
try { try {
return response.getJSONObject(AtlasClient.RESULTS); return response.getJSONObject(AtlasClient.RESULTS);
} catch (JSONException e) { } catch (JSONException e) {
...@@ -997,7 +1003,34 @@ public class AtlasClient { ...@@ -997,7 +1003,34 @@ public class AtlasClient {
} }
public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException { public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException {
JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph"); JSONObject response = callAPI(API.NAME_LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph");
try {
return response.getJSONObject(AtlasClient.RESULTS);
} catch (JSONException e) {
throw new AtlasServiceException(e);
}
}
public JSONObject getInputGraphForEntity(String entityId) throws AtlasServiceException {
JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, entityId, "/inputs/graph");
try {
return response.getJSONObject(AtlasClient.RESULTS);
} catch (JSONException e) {
throw new AtlasServiceException(e);
}
}
public JSONObject getOutputGraphForEntity(String datasetId) throws AtlasServiceException {
JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/outputs/graph");
try {
return response.getJSONObject(AtlasClient.RESULTS);
} catch (JSONException e) {
throw new AtlasServiceException(e);
}
}
public JSONObject getSchemaForEntity(String datasetId) throws AtlasServiceException {
JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/schema");
try { try {
return response.getJSONObject(AtlasClient.RESULTS); return response.getJSONObject(AtlasClient.RESULTS);
} catch (JSONException e) { } catch (JSONException e) {
......
...@@ -23,7 +23,7 @@ define(['require', ...@@ -23,7 +23,7 @@ define(['require',
'use strict'; 'use strict';
var VLineage = VBaseModel.extend({ var VLineage = VBaseModel.extend({
urlRoot: Globals.baseURL + 'api/atlas/lineage/hive/table/assetName/outputs/graph', urlRoot: Globals.baseURL + 'api/atlas/lineage/assetName/outputs/graph',
defaults: {}, defaults: {},
...@@ -36,7 +36,7 @@ define(['require', ...@@ -36,7 +36,7 @@ define(['require',
this.bindErrorEvents(); this.bindErrorEvents();
}, },
toString: function() { toString: function() {
return this.get('name'); return this.get('id');
}, },
}, {}); }, {});
return VLineage; return VLineage;
......
...@@ -22,7 +22,7 @@ define(['require', ...@@ -22,7 +22,7 @@ define(['require',
], function(require, Globals, VBaseModel) { ], function(require, Globals, VBaseModel) {
'use strict'; 'use strict';
var VSchema = VBaseModel.extend({ var VSchema = VBaseModel.extend({
urlRoot: Globals.baseURL + '/api/atlas/lineage/hive/table/log_fact_daily_mv/schema', urlRoot: Globals.baseURL + '/api/atlas/lineage/log_fact_daily_mv/schema',
defaults: {}, defaults: {},
...@@ -35,7 +35,7 @@ define(['require', ...@@ -35,7 +35,7 @@ define(['require',
this.bindErrorEvents(); this.bindErrorEvents();
}, },
toString: function() { toString: function() {
return this.get('name'); return this.get('id');
}, },
}, {}); }, {});
return VSchema; return VSchema;
......
...@@ -92,7 +92,7 @@ define(['require', ...@@ -92,7 +92,7 @@ define(['require',
this.renderEntityDetailTableLayoutView(); this.renderEntityDetailTableLayoutView();
this.renderTagTableLayoutView(tagGuid); this.renderTagTableLayoutView(tagGuid);
this.renderLineageLayoutView(tagGuid); this.renderLineageLayoutView(tagGuid);
this.renderSchemaLayoutView(); this.renderSchemaLayoutView(tagGuid);
}, this); }, this);
}, },
onRender: function() {}, onRender: function() {},
...@@ -120,17 +120,17 @@ define(['require', ...@@ -120,17 +120,17 @@ define(['require',
require(['views/graph/LineageLayoutView'], function(LineageLayoutView) { require(['views/graph/LineageLayoutView'], function(LineageLayoutView) {
that.RLineageLayoutView.show(new LineageLayoutView({ that.RLineageLayoutView.show(new LineageLayoutView({
globalVent: that.globalVent, globalVent: that.globalVent,
assetName: that.name, assetName: tagGuid,
guid: tagGuid guid: tagGuid
})); }));
}); });
}, },
renderSchemaLayoutView: function() { renderSchemaLayoutView: function(tagGuid) {
var that = this; var that = this;
require(['views/schema/SchemaLayoutView'], function(SchemaLayoutView) { require(['views/schema/SchemaLayoutView'], function(SchemaLayoutView) {
that.RSchemaTableLayoutView.show(new SchemaLayoutView({ that.RSchemaTableLayoutView.show(new SchemaLayoutView({
globalVent: that.globalVent, globalVent: that.globalVent,
name: that.name, name: tagGuid,
vent: that.vent vent: that.vent
})); }));
}); });
......
...@@ -56,8 +56,8 @@ define(['require', ...@@ -56,8 +56,8 @@ define(['require',
this.inputCollection = new VLineageList(); this.inputCollection = new VLineageList();
this.outputCollection = new VLineageList(); this.outputCollection = new VLineageList();
this.entityModel = new VEntity(); this.entityModel = new VEntity();
this.inputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/inputs/graph"; this.inputCollection.url = "/api/atlas/lineage/" + this.assetName + "/inputs/graph";
this.outputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/outputs/graph"; this.outputCollection.url = "/api/atlas/lineage/" + this.assetName + "/outputs/graph";
this.bindEvents(); this.bindEvents();
this.fetchGraphData(); this.fetchGraphData();
this.data = {}; this.data = {};
......
...@@ -73,7 +73,7 @@ define(['require', ...@@ -73,7 +73,7 @@ define(['require',
initialize: function(options) { initialize: function(options) {
_.extend(this, _.pick(options, 'globalVent', 'name', 'vent')); _.extend(this, _.pick(options, 'globalVent', 'name', 'vent'));
this.schemaCollection = new VSchemaList([], {}); this.schemaCollection = new VSchemaList([], {});
this.schemaCollection.url = "/api/atlas/lineage/hive/table/" + this.name + "/schema"; this.schemaCollection.url = "/api/atlas/lineage/" + this.name + "/schema";
this.commonTableOptions = { this.commonTableOptions = {
collection: this.schemaCollection, collection: this.schemaCollection,
includeFilter: false, includeFilter: false,
......
...@@ -63,15 +63,9 @@ atlas.kafka.auto.commit.enable=false ...@@ -63,15 +63,9 @@ atlas.kafka.auto.commit.enable=false
######### Hive Lineage Configs ######### ######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
#atlas.lineage.hive.table.type.name=DataSet
#atlas.lineage.hive.process.type.name=Process
#atlas.lineage.hive.process.inputs.name=inputs
#atlas.lineage.hive.process.outputs.name=outputs
## Schema ## Schema
atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns atlas.lineage.schema.query.hive_table=hive_table where __guid='%s'\, columns
atlas.lineage.hive.table.schema.query.Table=Table where name='%s'\, columns atlas.lineage.schema.query.Table=Table where __guid='%s'\, columns
## Server port configuration ## Server port configuration
#atlas.server.http.port=21000 #atlas.server.http.port=21000
......
...@@ -21,6 +21,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -21,6 +21,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-713 Entity lineage based on entity id (shwethags)
ATLAS-736 UI - BUG :: displaying timestamp values for hive_db description (kevalbhatt18 via yhemanth) ATLAS-736 UI - BUG :: displaying timestamp values for hive_db description (kevalbhatt18 via yhemanth)
ATLAS-784 Configure config.store.uri for Falcon hook IT (yhemanth) ATLAS-784 Configure config.store.uri for Falcon hook IT (yhemanth)
ATLAS-645 FieldMapping.output() results in stack overflow when instances reference each other (dkantor via shwethags) ATLAS-645 FieldMapping.output() results in stack overflow when instances reference each other (dkantor via shwethags)
......
...@@ -26,7 +26,7 @@ import com.google.inject.throwingproviders.ThrowingProviderBinder; ...@@ -26,7 +26,7 @@ import com.google.inject.throwingproviders.ThrowingProviderBinder;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DiscoveryService; import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.discovery.HiveLineageService; import org.apache.atlas.discovery.DataSetLineageService;
import org.apache.atlas.discovery.LineageService; import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
...@@ -83,7 +83,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { ...@@ -83,7 +83,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the DiscoveryService interface to an implementation // bind the DiscoveryService interface to an implementation
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton(); bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton(); bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton();
bindAuditRepository(binder()); bindAuditRepository(binder());
......
...@@ -20,19 +20,19 @@ package org.apache.atlas.discovery; ...@@ -20,19 +20,19 @@ package org.apache.atlas.discovery;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.GraphTransaction; import org.apache.atlas.GraphTransaction;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.query.Expressions;
import org.apache.atlas.query.GremlinQueryResult; import org.apache.atlas.query.GremlinQueryResult;
import org.apache.atlas.query.HiveLineageQuery; import org.apache.atlas.query.InputLineageClosureQuery;
import org.apache.atlas.query.HiveWhereUsedQuery; import org.apache.atlas.query.OutputLineageClosureQuery;
import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.GraphProvider; import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.persistence.ReferenceableInstance; import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -47,35 +47,29 @@ import javax.inject.Singleton; ...@@ -47,35 +47,29 @@ import javax.inject.Singleton;
* Hive implementation of Lineage service interface. * Hive implementation of Lineage service interface.
*/ */
@Singleton @Singleton
public class HiveLineageService implements LineageService { public class DataSetLineageService implements LineageService {
private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class); private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageService.class);
private static final Option<List<String>> SELECT_ATTRIBUTES = private static final Option<List<String>> SELECT_ATTRIBUTES =
Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"})); Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
public static final String SELECT_INSTANCE_GUID = "__guid";
public static final String HIVE_TABLE_SCHEMA_QUERY_PREFIX = "atlas.lineage.hive.table.schema.query."; public static final String DATASET_SCHEMA_QUERY_PREFIX = "atlas.lineage.schema.query.";
private static final String HIVE_TABLE_TYPE_NAME; private static final String HIVE_PROCESS_TYPE_NAME = "Process";
private static final String HIVE_PROCESS_TYPE_NAME; private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs";
private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME; private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs";
private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME;
private static final String HIVE_TABLE_EXISTS_QUERY; private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'";
private static final String DATASET_NAME_EXISTS_QUERY =
AtlasClient.DATA_SET_SUPER_TYPE + " where name = '%s' and __state = 'ACTIVE'";
private static final Configuration propertiesConf; private static final Configuration propertiesConf;
static { static {
// todo - externalize this using type system - dog food
try { try {
propertiesConf = ApplicationProperties.get(); propertiesConf = ApplicationProperties.get();
HIVE_TABLE_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.table.type.name", "DataSet");
HIVE_PROCESS_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.process.type.name", "Process");
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.inputs.name", "inputs");
HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.outputs.name", "outputs");
HIVE_TABLE_EXISTS_QUERY = propertiesConf.getString("atlas.lineage.hive.table.exists.query",
"from " + HIVE_TABLE_TYPE_NAME + " where name=\"%s\"");
} catch (AtlasException e) { } catch (AtlasException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
...@@ -87,136 +81,135 @@ public class HiveLineageService implements LineageService { ...@@ -87,136 +81,135 @@ public class HiveLineageService implements LineageService {
private final GraphBackedDiscoveryService discoveryService; private final GraphBackedDiscoveryService discoveryService;
@Inject @Inject
HiveLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository, DataSetLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository,
GraphBackedDiscoveryService discoveryService) throws DiscoveryException { GraphBackedDiscoveryService discoveryService) throws DiscoveryException {
this.titanGraph = graphProvider.get(); this.titanGraph = graphProvider.get();
this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository); this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
this.discoveryService = discoveryService; this.discoveryService = discoveryService;
} }
/** /**
* Return the lineage outputs for the given tableName. * Return the lineage outputs graph for the given datasetName.
* *
* @param tableName tableName * @param datasetName datasetName
* @return Lineage Outputs as JSON * @return Outputs Graph as JSON
*/ */
@Override @Override
@GraphTransaction @GraphTransaction
public String getOutputs(String tableName) throws AtlasException { public String getOutputsGraph(String datasetName) throws AtlasException {
LOG.info("Fetching lineage outputs for tableName={}", tableName); LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName);
ParamChecker.notEmpty(tableName, "table name cannot be null"); ParamChecker.notEmpty(datasetName, "dataset name");
validateTableExists(tableName); ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
return getOutputsGraphForId(datasetInstance.getId()._getId());
HiveWhereUsedQuery outputsQuery =
new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
Expressions.Expression expression = outputsQuery.expr();
LOG.debug("Expression is [" + expression.toString() + "]");
try {
return discoveryService.evaluate(expression).toJson();
} catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
}
} }
/** /**
* Return the lineage outputs graph for the given tableName. * Return the lineage inputs graph for the given tableName.
* *
* @param tableName tableName * @param tableName tableName
* @return Outputs Graph as JSON * @return Inputs Graph as JSON
*/ */
@Override @Override
@GraphTransaction @GraphTransaction
public String getOutputsGraph(String tableName) throws AtlasException { public String getInputsGraph(String tableName) throws AtlasException {
LOG.info("Fetching lineage outputs graph for tableName={}", tableName); LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
ParamChecker.notEmpty(tableName, "table name cannot be null"); ParamChecker.notEmpty(tableName, "table name");
validateTableExists(tableName); ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName);
return getInputsGraphForId(datasetInstance.getId()._getId());
HiveWhereUsedQuery outputsQuery =
new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
return outputsQuery.graph().toInstanceJson();
} }
/**
* Return the lineage inputs for the given tableName.
*
* @param tableName tableName
* @return Lineage Inputs as JSON
*/
@Override @Override
@GraphTransaction public String getInputsGraphForEntity(String guid) throws AtlasException {
public String getInputs(String tableName) throws AtlasException { LOG.info("Fetching lineage inputs graph for entity={}", guid);
LOG.info("Fetching lineage inputs for tableName={}", tableName); ParamChecker.notEmpty(guid, "Entity id");
ParamChecker.notEmpty(tableName, "table name cannot be null"); validateDatasetExists(guid);
validateTableExists(tableName); return getInputsGraphForId(guid);
}
HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, private String getInputsGraphForId(String guid) {
InputLineageClosureQuery
inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID,
guid, HIVE_PROCESS_TYPE_NAME,
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
return inputsQuery.graph().toInstanceJson();
Expressions.Expression expression = inputsQuery.expr();
LOG.debug("Expression is [" + expression.toString() + "]");
try {
return discoveryService.evaluate(expression).toJson();
} catch (Exception e) { // unable to catch ExpressionException
throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
}
} }
/**
* Return the lineage inputs graph for the given tableName.
*
* @param tableName tableName
* @return Inputs Graph as JSON
*/
@Override @Override
@GraphTransaction public String getOutputsGraphForEntity(String guid) throws AtlasException {
public String getInputsGraph(String tableName) throws AtlasException { LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
LOG.info("Fetching lineage inputs graph for tableName={}", tableName); ParamChecker.notEmpty(guid, "Entity id");
ParamChecker.notEmpty(tableName, "table name cannot be null"); validateDatasetExists(guid);
validateTableExists(tableName); return getOutputsGraphForId(guid);
}
HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME, private String getOutputsGraphForId(String guid) {
HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), OutputLineageClosureQuery outputsQuery =
SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); new OutputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, guid, HIVE_PROCESS_TYPE_NAME,
return inputsQuery.graph().toInstanceJson(); HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
return outputsQuery.graph().toInstanceJson();
} }
/** /**
* Return the schema for the given tableName. * Return the schema for the given tableName.
* *
* @param tableName tableName * @param datasetName tableName
* @return Schema as JSON * @return Schema as JSON
*/ */
@Override @Override
@GraphTransaction @GraphTransaction
public String getSchema(String tableName) throws AtlasException { public String getSchema(String datasetName) throws AtlasException {
LOG.info("Fetching schema for tableName={}", tableName); ParamChecker.notEmpty(datasetName, "table name");
ParamChecker.notEmpty(tableName, "table name cannot be null"); LOG.info("Fetching schema for tableName={}", datasetName);
String typeName = validateTableExists(tableName); ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId());
}
private String getSchemaForId(String typeName, String guid) throws DiscoveryException {
final String schemaQuery = final String schemaQuery =
String.format(propertiesConf.getString(HIVE_TABLE_SCHEMA_QUERY_PREFIX + typeName), tableName); String.format(propertiesConf.getString(DATASET_SCHEMA_QUERY_PREFIX + typeName), guid);
return discoveryService.searchByDSL(schemaQuery); return discoveryService.searchByDSL(schemaQuery);
} }
@Override
public String getSchemaForEntity(String guid) throws AtlasException {
ParamChecker.notEmpty(guid, "Entity id");
LOG.info("Fetching schema for entity guid={}", guid);
String typeName = validateDatasetExists(guid);
return getSchemaForId(typeName, guid);
}
/** /**
* Validate if indeed this is a table type and exists. * Validate if indeed this is a table type and exists.
* *
* @param tableName table name * @param datasetName table name
*/ */
private String validateTableExists(String tableName) throws AtlasException { private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException {
final String tableExistsQuery = String.format(HIVE_TABLE_EXISTS_QUERY, tableName); final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName);
GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery); GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
if (!(queryResult.rows().length() > 0)) { if (!(queryResult.rows().length() > 0)) {
throw new EntityNotFoundException(tableName + " does not exist"); throw new EntityNotFoundException(datasetName + " does not exist");
}
return (ReferenceableInstance)queryResult.rows().apply(0);
}
/**
* Validate if indeed this is a table type and exists.
*
* @param guid entity id
*/
private String validateDatasetExists(String guid) throws AtlasException {
final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid);
GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery);
if (!(queryResult.rows().length() > 0)) {
throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
} }
ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0); ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
return referenceable.getTypeName(); return referenceable.getTypeName();
} }
} }
...@@ -256,21 +256,21 @@ trait SingleInstanceClosureQuery[T] extends ClosureQuery { ...@@ -256,21 +256,21 @@ trait SingleInstanceClosureQuery[T] extends ClosureQuery {
* @param persistenceStrategy as needed to evaluate the Closure Query. * @param persistenceStrategy as needed to evaluate the Closure Query.
* @param g as needed to evaluate the Closure Query. * @param g as needed to evaluate the Closure Query.
*/ */
case class HiveLineageQuery(tableTypeName : String, case class InputLineageClosureQuery(tableTypeName : String,
tableName : String, attributeToSelectInstance : String,
ctasTypeName : String, tableName : String,
ctasInputTableAttribute : String, ctasTypeName : String,
ctasOutputTableAttribute : String, ctasInputTableAttribute : String,
depth : Option[Int], ctasOutputTableAttribute : String,
selectAttributes : Option[List[String]], depth : Option[Int],
withPath : Boolean, selectAttributes : Option[List[String]],
persistenceStrategy: GraphPersistenceStrategies, withPath : Boolean,
g: TitanGraph persistenceStrategy: GraphPersistenceStrategies,
g: TitanGraph
) extends SingleInstanceClosureQuery[String] { ) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName val closureType : String = tableTypeName
val attributeToSelectInstance = "name"
val attributeTyp = DataTypes.STRING_TYPE val attributeTyp = DataTypes.STRING_TYPE
val instanceValue = tableName val instanceValue = tableName
...@@ -296,21 +296,21 @@ case class HiveLineageQuery(tableTypeName : String, ...@@ -296,21 +296,21 @@ case class HiveLineageQuery(tableTypeName : String,
* @param persistenceStrategy as needed to evaluate the Closure Query. * @param persistenceStrategy as needed to evaluate the Closure Query.
* @param g as needed to evaluate the Closure Query. * @param g as needed to evaluate the Closure Query.
*/ */
case class HiveWhereUsedQuery(tableTypeName : String, case class OutputLineageClosureQuery(tableTypeName : String,
tableName : String, attributeToSelectInstance : String,
ctasTypeName : String, tableName : String,
ctasInputTableAttribute : String, ctasTypeName : String,
ctasOutputTableAttribute : String, ctasInputTableAttribute : String,
depth : Option[Int], ctasOutputTableAttribute : String,
selectAttributes : Option[List[String]], depth : Option[Int],
withPath : Boolean, selectAttributes : Option[List[String]],
persistenceStrategy: GraphPersistenceStrategies, withPath : Boolean,
g: TitanGraph persistenceStrategy: GraphPersistenceStrategies,
g: TitanGraph
) extends SingleInstanceClosureQuery[String] { ) extends SingleInstanceClosureQuery[String] {
val closureType : String = tableTypeName val closureType : String = tableTypeName
val attributeToSelectInstance = "name"
val attributeTyp = DataTypes.STRING_TYPE val attributeTyp = DataTypes.STRING_TYPE
val instanceValue = tableName val instanceValue = tableName
......
...@@ -55,7 +55,7 @@ import java.util.List; ...@@ -55,7 +55,7 @@ import java.util.List;
* Base Class to set up hive types and instances for tests * Base Class to set up hive types and instances for tests
*/ */
@Guice(modules = RepositoryMetadataModule.class) @Guice(modules = RepositoryMetadataModule.class)
public class BaseHiveRepositoryTest { public class BaseRepositoryTest {
@Inject @Inject
protected MetadataService metadataService; protected MetadataService metadataService;
...@@ -94,8 +94,8 @@ public class BaseHiveRepositoryTest { ...@@ -94,8 +94,8 @@ public class BaseHiveRepositoryTest {
metadataService.createType(typesAsJSON); metadataService.createType(typesAsJSON);
} }
private static final String DATABASE_TYPE = "hive_db"; protected static final String DATABASE_TYPE = "hive_db";
private static final String HIVE_TABLE_TYPE = "hive_table"; protected static final String HIVE_TABLE_TYPE = "hive_table";
private static final String COLUMN_TYPE = "hive_column"; private static final String COLUMN_TYPE = "hive_column";
private static final String HIVE_PROCESS_TYPE = "hive_process"; private static final String HIVE_PROCESS_TYPE = "hive_process";
private static final String STORAGE_DESC_TYPE = "StorageDesc"; private static final String STORAGE_DESC_TYPE = "StorageDesc";
...@@ -104,7 +104,8 @@ public class BaseHiveRepositoryTest { ...@@ -104,7 +104,8 @@ public class BaseHiveRepositoryTest {
TypesDef createTypeDefinitions() { TypesDef createTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
.createClassTypeDef(DATABASE_TYPE, null, attrDef("name", DataTypes.STRING_TYPE), .createClassTypeDef(DATABASE_TYPE, null,
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE), attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE)); attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
...@@ -127,8 +128,7 @@ public class BaseHiveRepositoryTest { ...@@ -127,8 +128,7 @@ public class BaseHiveRepositoryTest {
attrDef("temporary", DataTypes.BOOLEAN_TYPE), attrDef("temporary", DataTypes.BOOLEAN_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null), new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
// todo - uncomment this, something is broken // todo - uncomment this, something is broken
new AttributeDefinition("sd", STORAGE_DESC_TYPE, new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
Multiplicity.REQUIRED, true, null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE), new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null)); Multiplicity.COLLECTION, true, null));
...@@ -285,7 +285,7 @@ public class BaseHiveRepositoryTest { ...@@ -285,7 +285,7 @@ public class BaseHiveRepositoryTest {
return createInstance(referenceable, clsType); return createInstance(referenceable, clsType);
} }
Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns) protected Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns)
throws Exception { throws Exception {
Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE); Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
referenceable.set("location", location); referenceable.set("location", location);
...@@ -297,7 +297,7 @@ public class BaseHiveRepositoryTest { ...@@ -297,7 +297,7 @@ public class BaseHiveRepositoryTest {
return referenceable; return referenceable;
} }
Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception { protected Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set("name", name); referenceable.set("name", name);
referenceable.set("dataType", dataType); referenceable.set("dataType", dataType);
...@@ -306,7 +306,7 @@ public class BaseHiveRepositoryTest { ...@@ -306,7 +306,7 @@ public class BaseHiveRepositoryTest {
return referenceable; return referenceable;
} }
Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType, protected Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
List<Referenceable> columns, String... traitNames) throws Exception { List<Referenceable> columns, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
referenceable.set("name", name); referenceable.set("name", name);
...@@ -327,12 +327,12 @@ public class BaseHiveRepositoryTest { ...@@ -327,12 +327,12 @@ public class BaseHiveRepositoryTest {
return createInstance(referenceable, clsType); return createInstance(referenceable, clsType);
} }
Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables, protected Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames) String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
throws Exception { throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set(AtlasClient.NAME, name); referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("qualifiedName", name);
referenceable.set("description", description); referenceable.set("description", description);
referenceable.set("user", user); referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis()); referenceable.set("startTime", System.currentTimeMillis());
......
...@@ -18,9 +18,16 @@ ...@@ -18,9 +18,16 @@
package org.apache.atlas.discovery; package org.apache.atlas.discovery;
import org.apache.atlas.BaseHiveRepositoryTest; import com.google.common.collect.ImmutableList;
import org.apache.atlas.AtlasException;
import org.apache.atlas.BaseRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.commons.collections.ArrayStack;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert; import org.testng.Assert;
...@@ -31,18 +38,24 @@ import org.testng.annotations.Guice; ...@@ -31,18 +38,24 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.Arrays;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
/** /**
* Unit tests for Hive LineageService. * Unit tests for Hive LineageService.
*/ */
@Guice(modules = RepositoryMetadataModule.class) @Guice(modules = RepositoryMetadataModule.class)
public class HiveLineageServiceTest extends BaseHiveRepositoryTest { public class DataSetLineageServiceTest extends BaseRepositoryTest {
@Inject @Inject
private DiscoveryService discoveryService; private DiscoveryService discoveryService;
@Inject @Inject
private HiveLineageService hiveLineageService; private DataSetLineageService lineageService;
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -100,66 +113,55 @@ public class HiveLineageServiceTest extends BaseHiveRepositoryTest { ...@@ -100,66 +113,55 @@ public class HiveLineageServiceTest extends BaseHiveRepositoryTest {
public void testSearchByDSLQueries(String dslQuery) throws Exception { public void testSearchByDSLQueries(String dslQuery) throws Exception {
System.out.println("Executing dslQuery = " + dslQuery); System.out.println("Executing dslQuery = " + dslQuery);
String jsonResults = discoveryService.searchByDSL(dslQuery); String jsonResults = discoveryService.searchByDSL(dslQuery);
Assert.assertNotNull(jsonResults); assertNotNull(jsonResults);
JSONObject results = new JSONObject(jsonResults); JSONObject results = new JSONObject(jsonResults);
Assert.assertEquals(results.length(), 3); Assert.assertEquals(results.length(), 3);
System.out.println("results = " + results); System.out.println("results = " + results);
Object query = results.get("query"); Object query = results.get("query");
Assert.assertNotNull(query); assertNotNull(query);
JSONObject dataType = results.getJSONObject("dataType"); JSONObject dataType = results.getJSONObject("dataType");
Assert.assertNotNull(dataType); assertNotNull(dataType);
String typeName = dataType.getString("typeName"); String typeName = dataType.getString("typeName");
Assert.assertNotNull(typeName); assertNotNull(typeName);
JSONArray rows = results.getJSONArray("rows"); JSONArray rows = results.getJSONArray("rows");
Assert.assertNotNull(rows); assertNotNull(rows);
Assert.assertTrue(rows.length() >= 0); // some queries may not have any results Assert.assertTrue(rows.length() >= 0); // some queries may not have any results
System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows"); System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows");
} }
@Test @Test(dataProvider = "invalidArgumentsProvider")
public void testGetInputs() throws Exception { public void testGetInputsGraphInvalidArguments(final String tableName, String expectedException) throws Exception {
JSONObject results = new JSONObject(hiveLineageService.getInputs("sales_fact_monthly_mv")); testInvalidArguments(expectedException, new Invoker() {
Assert.assertNotNull(results); @Override
System.out.println("inputs = " + results); void run() throws AtlasException {
lineageService.getInputsGraph(tableName);
JSONArray rows = results.getJSONArray("rows"); }
Assert.assertTrue(rows.length() > 0); });
final JSONObject row = rows.getJSONObject(0);
JSONArray paths = row.getJSONArray("path");
Assert.assertTrue(paths.length() > 0);
} }
@Test(expectedExceptions = IllegalArgumentException.class) @Test(dataProvider = "invalidArgumentsProvider")
public void testGetInputsTableNameNull() throws Exception { public void testGetInputsGraphForEntityInvalidArguments(final String tableName, String expectedException)
hiveLineageService.getInputs(null); throws Exception {
Assert.fail(); testInvalidArguments(expectedException, new Invoker() {
} @Override
void run() throws AtlasException {
@Test(expectedExceptions = IllegalArgumentException.class) lineageService.getInputsGraphForEntity(tableName);
public void testGetInputsTableNameEmpty() throws Exception { }
hiveLineageService.getInputs(""); });
Assert.fail();
}
@Test(expectedExceptions = EntityNotFoundException.class)
public void testGetInputsBadTableName() throws Exception {
hiveLineageService.getInputs("blah");
Assert.fail();
} }
@Test @Test
public void testGetInputsGraph() throws Exception { public void testGetInputsGraph() throws Exception {
JSONObject results = new JSONObject(hiveLineageService.getInputsGraph("sales_fact_monthly_mv")); JSONObject results = new JSONObject(lineageService.getInputsGraph("sales_fact_monthly_mv"));
Assert.assertNotNull(results); assertNotNull(results);
System.out.println("inputs graph = " + results); System.out.println("inputs graph = " + results);
JSONObject values = results.getJSONObject("values"); JSONObject values = results.getJSONObject("values");
Assert.assertNotNull(values); assertNotNull(values);
final JSONObject vertices = values.getJSONObject("vertices"); final JSONObject vertices = values.getJSONObject("vertices");
Assert.assertEquals(vertices.length(), 4); Assert.assertEquals(vertices.length(), 4);
...@@ -169,45 +171,72 @@ public class HiveLineageServiceTest extends BaseHiveRepositoryTest { ...@@ -169,45 +171,72 @@ public class HiveLineageServiceTest extends BaseHiveRepositoryTest {
} }
@Test @Test
public void testGetOutputs() throws Exception { public void testGetInputsGraphForEntity() throws Exception {
JSONObject results = new JSONObject(hiveLineageService.getOutputs("sales_fact")); ITypedReferenceableInstance entity =
Assert.assertNotNull(results); repository.getEntityDefinition(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv");
System.out.println("outputs = " + results);
JSONArray rows = results.getJSONArray("rows"); JSONObject results = new JSONObject(lineageService.getInputsGraphForEntity(entity.getId()._getId()));
Assert.assertTrue(rows.length() > 0); assertNotNull(results);
System.out.println("inputs graph = " + results);
final JSONObject row = rows.getJSONObject(0); JSONObject values = results.getJSONObject("values");
JSONArray paths = row.getJSONArray("path"); assertNotNull(values);
Assert.assertTrue(paths.length() > 0);
} final JSONObject vertices = values.getJSONObject("vertices");
Assert.assertEquals(vertices.length(), 4);
@Test(expectedExceptions = IllegalArgumentException.class) final JSONObject edges = values.getJSONObject("edges");
public void testGetOututsTableNameNull() throws Exception { Assert.assertEquals(edges.length(), 4);
hiveLineageService.getOutputs(null);
Assert.fail();
} }
@Test(expectedExceptions = IllegalArgumentException.class) @Test(dataProvider = "invalidArgumentsProvider")
public void testGetOutputsTableNameEmpty() throws Exception { public void testGetOutputsGraphInvalidArguments(final String tableName, String expectedException) throws Exception {
hiveLineageService.getOutputs(""); testInvalidArguments(expectedException, new Invoker() {
Assert.fail(); @Override
void run() throws AtlasException {
lineageService.getOutputsGraph(tableName);
}
});
} }
@Test(expectedExceptions = EntityNotFoundException.class) @Test(dataProvider = "invalidArgumentsProvider")
public void testGetOutputsBadTableName() throws Exception { public void testGetOutputsGraphForEntityInvalidArguments(final String tableName, String expectedException)
hiveLineageService.getOutputs("blah"); throws Exception {
Assert.fail(); testInvalidArguments(expectedException, new Invoker() {
@Override
void run() throws AtlasException {
lineageService.getOutputsGraphForEntity(tableName);
}
});
} }
@Test @Test
public void testGetOutputsGraph() throws Exception { public void testGetOutputsGraph() throws Exception {
JSONObject results = new JSONObject(hiveLineageService.getOutputsGraph("sales_fact")); JSONObject results = new JSONObject(lineageService.getOutputsGraph("sales_fact"));
Assert.assertNotNull(results); assertNotNull(results);
System.out.println("outputs graph = " + results); System.out.println("outputs graph = " + results);
JSONObject values = results.getJSONObject("values"); JSONObject values = results.getJSONObject("values");
Assert.assertNotNull(values); assertNotNull(values);
final JSONObject vertices = values.getJSONObject("vertices");
Assert.assertEquals(vertices.length(), 3);
final JSONObject edges = values.getJSONObject("edges");
Assert.assertEquals(edges.length(), 4);
}
@Test
public void testGetOutputsGraphForEntity() throws Exception {
ITypedReferenceableInstance entity =
repository.getEntityDefinition(HIVE_TABLE_TYPE, "name", "sales_fact");
JSONObject results = new JSONObject(lineageService.getOutputsGraphForEntity(entity.getId()._getId()));
assertNotNull(results);
System.out.println("outputs graph = " + results);
JSONObject values = results.getJSONObject("values");
assertNotNull(values);
final JSONObject vertices = values.getJSONObject("vertices"); final JSONObject vertices = values.getJSONObject("vertices");
Assert.assertEquals(vertices.length(), 3); Assert.assertEquals(vertices.length(), 3);
...@@ -224,8 +253,29 @@ public class HiveLineageServiceTest extends BaseHiveRepositoryTest { ...@@ -224,8 +253,29 @@ public class HiveLineageServiceTest extends BaseHiveRepositoryTest {
@Test(dataProvider = "tableNamesProvider") @Test(dataProvider = "tableNamesProvider")
public void testGetSchema(String tableName, String expected) throws Exception { public void testGetSchema(String tableName, String expected) throws Exception {
JSONObject results = new JSONObject(hiveLineageService.getSchema(tableName)); JSONObject results = new JSONObject(lineageService.getSchema(tableName));
Assert.assertNotNull(results); assertNotNull(results);
System.out.println("columns = " + results);
JSONArray rows = results.getJSONArray("rows");
Assert.assertEquals(rows.length(), Integer.parseInt(expected));
for (int index = 0; index < rows.length(); index++) {
final JSONObject row = rows.getJSONObject(index);
assertNotNull(row.getString("name"));
assertNotNull(row.getString("comment"));
assertNotNull(row.getString("dataType"));
Assert.assertEquals(row.getString("$typeName$"), "hive_column");
}
}
@Test(dataProvider = "tableNamesProvider")
public void testGetSchemaForEntity(String tableName, String expected) throws Exception {
ITypedReferenceableInstance entity =
repository.getEntityDefinition(HIVE_TABLE_TYPE, "name", tableName);
JSONObject results = new JSONObject(lineageService.getSchemaForEntity(entity.getId()._getId()));
assertNotNull(results);
System.out.println("columns = " + results); System.out.println("columns = " + results);
JSONArray rows = results.getJSONArray("rows"); JSONArray rows = results.getJSONArray("rows");
...@@ -233,28 +283,165 @@ public class HiveLineageServiceTest extends BaseHiveRepositoryTest { ...@@ -233,28 +283,165 @@ public class HiveLineageServiceTest extends BaseHiveRepositoryTest {
for (int index = 0; index < rows.length(); index++) { for (int index = 0; index < rows.length(); index++) {
final JSONObject row = rows.getJSONObject(index); final JSONObject row = rows.getJSONObject(index);
Assert.assertNotNull(row.getString("name")); assertNotNull(row.getString("name"));
Assert.assertNotNull(row.getString("comment")); assertNotNull(row.getString("comment"));
Assert.assertNotNull(row.getString("dataType")); assertNotNull(row.getString("dataType"));
Assert.assertEquals(row.getString("$typeName$"), "hive_column"); Assert.assertEquals(row.getString("$typeName$"), "hive_column");
} }
} }
@Test(expectedExceptions = IllegalArgumentException.class) @DataProvider(name = "invalidArgumentsProvider")
public void testGetSchemaTableNameNull() throws Exception { private Object[][] arguments() {
hiveLineageService.getSchema(null); return new String[][]{{null, IllegalArgumentException.class.getName()},
Assert.fail(); {"", IllegalArgumentException.class.getName()},
{"blah", EntityNotFoundException.class.getName()}};
}
abstract class Invoker {
abstract void run() throws AtlasException;
}
public void testInvalidArguments(String expectedException, Invoker invoker) throws Exception {
try {
invoker.run();
fail("Expected " + expectedException);
} catch(Exception e) {
assertEquals(e.getClass().getName(), expectedException);
}
}
@Test(dataProvider = "invalidArgumentsProvider")
public void testGetSchemaInvalidArguments(final String tableName, String expectedException) throws Exception {
testInvalidArguments(expectedException, new Invoker() {
@Override
void run() throws AtlasException {
lineageService.getSchema(tableName);
}
});
}
@Test(dataProvider = "invalidArgumentsProvider")
public void testGetSchemaForEntityInvalidArguments(final String entityId, String expectedException) throws Exception {
testInvalidArguments(expectedException, new Invoker() {
@Override
void run() throws AtlasException {
lineageService.getSchemaForEntity(entityId);
}
});
}
@Test
public void testLineageWithDelete() throws Exception {
String tableName = "table" + random();
createTable(tableName, 3, true);
JSONObject results = new JSONObject(lineageService.getSchema(tableName));
assertEquals(results.getJSONArray("rows").length(), 3);
results = new JSONObject(lineageService.getInputsGraph(tableName));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
results = new JSONObject(lineageService.getOutputsGraph(tableName));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
results = new JSONObject(lineageService.getSchemaForEntity(tableId));
assertEquals(results.getJSONArray("rows").length(), 3);
results = new JSONObject(lineageService.getInputsGraphForEntity(tableId));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
results = new JSONObject(lineageService.getOutputsGraphForEntity(tableId));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
//Delete the entity. Lineage for entity returns the same results as before.
//Lineage for table name throws EntityNotFoundException
repository.deleteEntities(Arrays.asList(tableId));
results = new JSONObject(lineageService.getSchemaForEntity(tableId));
assertEquals(results.getJSONArray("rows").length(), 3);
results = new JSONObject(lineageService.getInputsGraphForEntity(tableId));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
results = new JSONObject(lineageService.getOutputsGraphForEntity(tableId));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
try {
lineageService.getSchema(tableName);
fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) {
//expected
}
try {
lineageService.getInputsGraph(tableName);
fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) {
//expected
}
try {
lineageService.getOutputsGraph(tableName);
fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) {
//expected
}
//Create table again should show new lineage
createTable(tableName, 2, false);
results = new JSONObject(lineageService.getSchema(tableName));
assertEquals(results.getJSONArray("rows").length(), 2);
results = new JSONObject(lineageService.getOutputsGraph(tableName));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
results = new JSONObject(lineageService.getInputsGraph(tableName));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
results = new JSONObject(lineageService.getSchemaForEntity(tableId));
assertEquals(results.getJSONArray("rows").length(), 2);
results = new JSONObject(lineageService.getInputsGraphForEntity(tableId));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
results = new JSONObject(lineageService.getOutputsGraphForEntity(tableId));
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
}
private void createTable(String tableName, int numCols, boolean createLineage) throws Exception {
String dbId = getEntityId(DATABASE_TYPE, "name", "Sales");
Id salesDB = new Id(dbId, 0, DATABASE_TYPE);
//Create the entity again and schema should return the new schema
List<Referenceable> columns = new ArrayStack();
for (int i = 0; i < numCols; i++) {
columns.add(column("col" + random(), "int", "column descr"));
}
Referenceable sd =
storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true,
ImmutableList.of(column("time_id", "int", "time id")));
Id table = table(tableName, "test table", salesDB, sd, "fetl", "External", columns);
if (createLineage) {
Id inTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns);
Id outTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns);
loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(inTable),
ImmutableList.of(table), "create table as select ", "plan", "id", "graph", "ETL");
loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(table),
ImmutableList.of(outTable), "create table as select ", "plan", "id", "graph", "ETL");
}
} }
@Test(expectedExceptions = IllegalArgumentException.class) private String random() {
public void testGetSchemaTableNameEmpty() throws Exception { return RandomStringUtils.randomAlphanumeric(5);
hiveLineageService.getSchema("");
Assert.fail();
} }
@Test(expectedExceptions = EntityNotFoundException.class) private String getEntityId(String typeName, String attributeName, String attributeValue) throws Exception {
public void testGetSchemaBadTableName() throws Exception { return repository.getEntityDefinition(typeName, attributeName, attributeValue).getId()._getId();
hiveLineageService.getSchema("blah");
Assert.fail();
} }
} }
...@@ -20,7 +20,7 @@ package org.apache.atlas.discovery; ...@@ -20,7 +20,7 @@ package org.apache.atlas.discovery;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.atlas.BaseHiveRepositoryTest; import org.apache.atlas.BaseRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContext;
import org.apache.atlas.TestUtils; import org.apache.atlas.TestUtils;
...@@ -60,7 +60,7 @@ import static org.testng.Assert.assertEquals; ...@@ -60,7 +60,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
@Guice(modules = RepositoryMetadataModule.class) @Guice(modules = RepositoryMetadataModule.class)
public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest { public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest {
@Inject @Inject
private MetadataRepository repositoryService; private MetadataRepository repositoryService;
......
...@@ -107,7 +107,7 @@ class GremlinTest2 extends BaseGremlinTest { ...@@ -107,7 +107,7 @@ class GremlinTest2 extends BaseGremlinTest {
} }
@Test def testHighLevelLineage { @Test def testHighLevelLineage {
val r = HiveLineageQuery("Table", "sales_fact_monthly_mv", val r = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv",
"LoadProcess", "LoadProcess",
"inputTables", "inputTables",
"outputTable", "outputTable",
...@@ -116,7 +116,7 @@ class GremlinTest2 extends BaseGremlinTest { ...@@ -116,7 +116,7 @@ class GremlinTest2 extends BaseGremlinTest {
} }
@Test def testHighLevelLineageReturnGraph { @Test def testHighLevelLineageReturnGraph {
val r = HiveLineageQuery("Table", "sales_fact_monthly_mv", val r = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv",
"LoadProcess", "LoadProcess",
"inputTables", "inputTables",
"outputTable", "outputTable",
...@@ -127,7 +127,7 @@ class GremlinTest2 extends BaseGremlinTest { ...@@ -127,7 +127,7 @@ class GremlinTest2 extends BaseGremlinTest {
} }
@Test def testHighLevelWhereUsed { @Test def testHighLevelWhereUsed {
val r = HiveWhereUsedQuery("Table", "sales_fact", val r = OutputLineageClosureQuery("Table", "name", "sales_fact",
"LoadProcess", "LoadProcess",
"inputTables", "inputTables",
"outputTable", "outputTable",
...@@ -136,7 +136,7 @@ class GremlinTest2 extends BaseGremlinTest { ...@@ -136,7 +136,7 @@ class GremlinTest2 extends BaseGremlinTest {
} }
@Test def testHighLevelWhereUsedReturnGraph { @Test def testHighLevelWhereUsedReturnGraph {
val r = HiveWhereUsedQuery("Table", "sales_fact", val r = OutputLineageClosureQuery("Table", "name", "sales_fact",
"LoadProcess", "LoadProcess",
"inputTables", "inputTables",
"outputTable", "outputTable",
......
...@@ -26,42 +26,50 @@ import org.apache.atlas.AtlasException; ...@@ -26,42 +26,50 @@ import org.apache.atlas.AtlasException;
public interface LineageService { public interface LineageService {
/** /**
* Return the lineage outputs for the given tableName. * Return the lineage outputs graph for the given datasetName.
* *
* @param tableName tableName * @param datasetName datasetName
* @return Outputs as JSON * @return Outputs Graph as JSON
*/ */
String getOutputs(String tableName) throws AtlasException; String getOutputsGraph(String datasetName) throws AtlasException;
/** /**
* Return the lineage outputs graph for the given tableName. * Return the lineage inputs graph for the given datasetName.
* *
* @param tableName tableName * @param datasetName datasetName
* @return Outputs Graph as JSON * @return Inputs Graph as JSON
*/ */
String getOutputsGraph(String tableName) throws AtlasException; String getInputsGraph(String datasetName) throws AtlasException;
/** /**
* Return the lineage inputs for the given tableName. * Return the lineage inputs graph for the given entity id.
* *
* @param tableName tableName * @param guid entity id
* @return Inputs as JSON * @return Inputs Graph as JSON
*/ */
String getInputs(String tableName) throws AtlasException; String getInputsGraphForEntity(String guid) throws AtlasException;
/** /**
* Return the lineage inputs graph for the given tableName. * Return the lineage inputs graph for the given entity id.
* *
* @param tableName tableName * @param guid entity id
* @return Inputs Graph as JSON * @return Inputs Graph as JSON
*/ */
String getInputsGraph(String tableName) throws AtlasException; String getOutputsGraphForEntity(String guid) throws AtlasException;
/**
* Return the schema for the given datasetName.
*
* @param datasetName datasetName
* @return Schema as JSON
*/
String getSchema(String datasetName) throws AtlasException;
/** /**
* Return the schema for the given tableName. * Return the schema for the given entity id.
* *
* @param tableName tableName * @param guid tableName
* @return Schema as JSON * @return Schema as JSON
*/ */
String getSchema(String tableName) throws AtlasException; String getSchemaForEntity(String guid) throws AtlasException;
} }
...@@ -49,14 +49,8 @@ atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address} ...@@ -49,14 +49,8 @@ atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
######### Hive Lineage Configs ######### ######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
#atlas.lineage.hive.table.type.name=DataSet
#atlas.lineage.hive.process.type.name=Process
#atlas.lineage.hive.process.inputs.name=inputs
#atlas.lineage.hive.process.outputs.name=outputs
## Schema ## Schema
atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns atlas.lineage.schema.query.hive_table=hive_table where __guid='%s'\, columns
######### Notification Configs ######### ######### Notification Configs #########
atlas.notification.embedded=true atlas.notification.embedded=true
......
...@@ -19,10 +19,9 @@ ...@@ -19,10 +19,9 @@
package org.apache.atlas.web.resources; package org.apache.atlas.web.resources;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.discovery.DiscoveryException; import org.apache.atlas.discovery.DiscoveryException;
import org.apache.atlas.discovery.LineageService; import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.web.util.Servlets; import org.apache.atlas.web.util.Servlets;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -45,9 +44,9 @@ import javax.ws.rs.core.Response; ...@@ -45,9 +44,9 @@ import javax.ws.rs.core.Response;
*/ */
@Path("lineage/hive") @Path("lineage/hive")
@Singleton @Singleton
public class HiveLineageResource { public class DataSetLineageResource {
private static final Logger LOG = LoggerFactory.getLogger(HiveLineageResource.class); private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class);
private final LineageService lineageService; private final LineageService lineageService;
...@@ -58,7 +57,7 @@ public class HiveLineageResource { ...@@ -58,7 +57,7 @@ public class HiveLineageResource {
* @param lineageService lineage service handle * @param lineageService lineage service handle
*/ */
@Inject @Inject
public HiveLineageResource(LineageService lineageService) { public DataSetLineageResource(LineageService lineageService) {
this.lineageService = lineageService; this.lineageService = lineageService;
} }
...@@ -75,7 +74,6 @@ public class HiveLineageResource { ...@@ -75,7 +74,6 @@ public class HiveLineageResource {
LOG.info("Fetching lineage inputs graph for tableName={}", tableName); LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
try { try {
ParamChecker.notEmpty(tableName, "table name cannot be null");
final String jsonResult = lineageService.getInputsGraph(tableName); final String jsonResult = lineageService.getInputsGraph(tableName);
JSONObject response = new JSONObject(); JSONObject response = new JSONObject();
...@@ -109,7 +107,6 @@ public class HiveLineageResource { ...@@ -109,7 +107,6 @@ public class HiveLineageResource {
LOG.info("Fetching lineage outputs graph for tableName={}", tableName); LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
try { try {
ParamChecker.notEmpty(tableName, "table name cannot be null");
final String jsonResult = lineageService.getOutputsGraph(tableName); final String jsonResult = lineageService.getOutputsGraph(tableName);
JSONObject response = new JSONObject(); JSONObject response = new JSONObject();
...@@ -143,7 +140,6 @@ public class HiveLineageResource { ...@@ -143,7 +140,6 @@ public class HiveLineageResource {
LOG.info("Fetching schema for tableName={}", tableName); LOG.info("Fetching schema for tableName={}", tableName);
try { try {
ParamChecker.notEmpty(tableName, "table name cannot be null");
final String jsonResult = lineageService.getSchema(tableName); final String jsonResult = lineageService.getSchema(tableName);
JSONObject response = new JSONObject(); JSONObject response = new JSONObject();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.discovery.DiscoveryException;
import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.web.util.Servlets;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
@Path("lineage")
@Singleton
public class LineageResource {
private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class);
private final LineageService lineageService;
/**
* Created by the Guice ServletModule and injected with the
* configured LineageService.
*
* @param lineageService lineage service handle
*/
@Inject
public LineageResource(LineageService lineageService) {
this.lineageService = lineageService;
}
/**
* Returns input lineage graph for the given entity id.
* @param guid dataset entity id
* @return
*/
@GET
@Path("{guid}/inputs/graph")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response inputsGraph(@PathParam("guid") String guid) {
LOG.info("Fetching lineage inputs graph for guid={}", guid);
try {
final String jsonResult = lineageService.getInputsGraphForEntity(guid);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
return Response.ok(response).build();
} catch (EntityNotFoundException e) {
LOG.error("entity not found for guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (DiscoveryException | IllegalArgumentException e) {
LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
/**
* Returns the outputs graph for a given entity id.
*
* @param guid dataset entity id
*/
@GET
@Path("{guid}/outputs/graph")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response outputsGraph(@PathParam("guid") String guid) {
LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
try {
final String jsonResult = lineageService.getOutputsGraphForEntity(guid);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
return Response.ok(response).build();
} catch (EntityNotFoundException e) {
LOG.error("table entity not found for {}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (DiscoveryException | IllegalArgumentException e) {
LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
/**
* Returns the schema for the given dataset id.
*
* @param guid dataset entity id
*/
@GET
@Path("{guid}/schema")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response schema(@PathParam("guid") String guid) {
LOG.info("Fetching schema for entity guid={}", guid);
try {
final String jsonResult = lineageService.getSchemaForEntity(guid);
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.RESULTS, new JSONObject(jsonResult));
return Response.ok(response).build();
} catch (EntityNotFoundException e) {
LOG.error("table entity not found for {}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (DiscoveryException | IllegalArgumentException e) {
LOG.error("Unable to get schema for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get schema for entity={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}
}
...@@ -38,7 +38,7 @@ import java.util.List; ...@@ -38,7 +38,7 @@ import java.util.List;
/** /**
* Hive Lineage Integration Tests. * Hive Lineage Integration Tests.
*/ */
public class HiveLineageJerseyResourceIT extends BaseResourceIT { public class DataSetLineageJerseyResourceIT extends BaseResourceIT {
private static final String BASE_URI = "api/atlas/lineage/hive/table/"; private static final String BASE_URI = "api/atlas/lineage/hive/table/";
private String salesFactTable; private String salesFactTable;
...@@ -81,6 +81,22 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { ...@@ -81,6 +81,22 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
} }
@Test @Test
public void testInputsGraphForEntity() throws Exception {
String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, "name", salesMonthlyTable).getId()._getId();
JSONObject results = serviceClient.getInputGraphForEntity(tableId);
Assert.assertNotNull(results);
JSONObject values = results.getJSONObject("values");
Assert.assertNotNull(values);
final JSONObject vertices = values.getJSONObject("vertices");
Assert.assertEquals(vertices.length(), 4);
final JSONObject edges = values.getJSONObject("edges");
Assert.assertEquals(edges.length(), 4);
}
@Test
public void testOutputsGraph() throws Exception { public void testOutputsGraph() throws Exception {
WebResource resource = service.path(BASE_URI).path(salesFactTable).path("outputs").path("graph"); WebResource resource = service.path(BASE_URI).path(salesFactTable).path("outputs").path("graph");
...@@ -109,6 +125,22 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { ...@@ -109,6 +125,22 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
} }
@Test @Test
public void testOutputsGraphForEntity() throws Exception {
String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, "name", salesFactTable).getId()._getId();
JSONObject results = serviceClient.getOutputGraphForEntity(tableId);
Assert.assertNotNull(results);
JSONObject values = results.getJSONObject("values");
Assert.assertNotNull(values);
final JSONObject vertices = values.getJSONObject("vertices");
Assert.assertEquals(vertices.length(), 3);
final JSONObject edges = values.getJSONObject("edges");
Assert.assertEquals(edges.length(), 4);
}
@Test
public void testSchema() throws Exception { public void testSchema() throws Exception {
WebResource resource = service.path(BASE_URI).path(salesFactTable).path("schema"); WebResource resource = service.path(BASE_URI).path(salesFactTable).path("schema");
...@@ -139,6 +171,24 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { ...@@ -139,6 +171,24 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
} }
@Test @Test
public void testSchemaForEntity() throws Exception {
String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE, "name", salesFactTable).getId()._getId();
JSONObject results = serviceClient.getSchemaForEntity(tableId);
Assert.assertNotNull(results);
JSONArray rows = results.getJSONArray("rows");
Assert.assertEquals(rows.length(), 4);
for (int index = 0; index < rows.length(); index++) {
final JSONObject row = rows.getJSONObject(index);
Assert.assertNotNull(row.getString("name"));
Assert.assertNotNull(row.getString("comment"));
Assert.assertNotNull(row.getString("dataType"));
Assert.assertEquals(row.getString("$typeName$"), "hive_column");
}
}
@Test
public void testSchemaForEmptyTable() throws Exception { public void testSchemaForEmptyTable() throws Exception {
WebResource resource = service.path(BASE_URI).path("").path("schema"); WebResource resource = service.path(BASE_URI).path("").path("schema");
...@@ -184,8 +234,7 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { ...@@ -184,8 +234,7 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB, table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB,
"Joe BI", "MANAGED", salesFactColumns, "Metric"); "Joe BI", "MANAGED", salesFactColumns, "Metric");
String procName = "loadSalesDaily" + randomString(); loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim),
loadProcess(procName, "John ETL", ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL"); ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
salesMonthlyTable = "sales_fact_monthly_mv" + randomString(); salesMonthlyTable = "sales_fact_monthly_mv" + randomString();
...@@ -238,8 +287,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT { ...@@ -238,8 +287,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText, Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText,
String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception { String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set(AtlasClient.NAME, name); referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("qualifiedName", name);
referenceable.set("user", user); referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis()); referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000); referenceable.set("endTime", System.currentTimeMillis() + 10000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment