Commit a96424a1 by Shwetha GS

ATLAS-1207 Dataset exists query in lineage APIs takes longer (shwethags)

parent 08f56903
...@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ...@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES: ALL CHANGES:
ATLAS-1207 Dataset exists query in lineage APIs takes longer (shwethags)
ATLAS-1232 added preCreate(), preDelete() in typedef persistence, to enable edge creation for references in a later stage (mneethiraj) ATLAS-1232 added preCreate(), preDelete() in typedef persistence, to enable edge creation for references in a later stage (mneethiraj)
ATLAS-1183 UI: help link should point to atlas website (kevalbhatt via shwethags) ATLAS-1183 UI: help link should point to atlas website (kevalbhatt via shwethags)
ATLAS-1182 Hive Column level lineage docs (svimal2106 via shwethags) ATLAS-1182 Hive Column level lineage docs (svimal2106 via shwethags)
......
...@@ -18,9 +18,6 @@ ...@@ -18,9 +18,6 @@
package org.apache.atlas.discovery; package org.apache.atlas.discovery;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
...@@ -28,25 +25,31 @@ import org.apache.atlas.AtlasProperties; ...@@ -28,25 +25,31 @@ import org.apache.atlas.AtlasProperties;
import org.apache.atlas.GraphTransaction; import org.apache.atlas.GraphTransaction;
import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.query.GremlinQueryResult;
import org.apache.atlas.query.InputLineageClosureQuery; import org.apache.atlas.query.InputLineageClosureQuery;
import org.apache.atlas.query.OutputLineageClosureQuery; import org.apache.atlas.query.OutputLineageClosureQuery;
import org.apache.atlas.query.QueryParams; import org.apache.atlas.query.QueryParams;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.SchemaNotFoundException; import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
import org.apache.atlas.typesystem.persistence.ReferenceableInstance; import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.Option; import scala.Option;
import scala.Some; import scala.Some;
import scala.collection.immutable.List; import scala.collection.immutable.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Iterator;
/** /**
* Hive implementation of Lineage service interface. * Hive implementation of Lineage service interface.
*/ */
...@@ -66,10 +69,6 @@ public class DataSetLineageService implements LineageService { ...@@ -66,10 +69,6 @@ public class DataSetLineageService implements LineageService {
private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs"; private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs";
private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs"; private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs";
private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'";
private static final String DATASET_NAME_EXISTS_QUERY =
AtlasClient.DATA_SET_SUPER_TYPE + " where " + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME + "='%s' and __state = 'ACTIVE'";
private static final Configuration propertiesConf; private static final Configuration propertiesConf;
static { static {
...@@ -104,8 +103,8 @@ public class DataSetLineageService implements LineageService { ...@@ -104,8 +103,8 @@ public class DataSetLineageService implements LineageService {
public String getOutputsGraph(String datasetName) throws AtlasException { public String getOutputsGraph(String datasetName) throws AtlasException {
LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName); LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName);
datasetName = ParamChecker.notEmpty(datasetName, "dataset name"); datasetName = ParamChecker.notEmpty(datasetName, "dataset name");
ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName); TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
return getOutputsGraphForId(datasetInstance.getId()._getId()); return getOutputsGraphForId(typeIdPair.right);
} }
/** /**
...@@ -119,8 +118,8 @@ public class DataSetLineageService implements LineageService { ...@@ -119,8 +118,8 @@ public class DataSetLineageService implements LineageService {
public String getInputsGraph(String tableName) throws AtlasException { public String getInputsGraph(String tableName) throws AtlasException {
LOG.info("Fetching lineage inputs graph for tableName={}", tableName); LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
tableName = ParamChecker.notEmpty(tableName, "table name"); tableName = ParamChecker.notEmpty(tableName, "table name");
ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName); TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(tableName);
return getInputsGraphForId(datasetInstance.getId()._getId()); return getInputsGraphForId(typeIdPair.right);
} }
@Override @Override
...@@ -169,9 +168,9 @@ public class DataSetLineageService implements LineageService { ...@@ -169,9 +168,9 @@ public class DataSetLineageService implements LineageService {
public String getSchema(String datasetName) throws AtlasException { public String getSchema(String datasetName) throws AtlasException {
datasetName = ParamChecker.notEmpty(datasetName, "table name"); datasetName = ParamChecker.notEmpty(datasetName, "table name");
LOG.info("Fetching schema for tableName={}", datasetName); LOG.info("Fetching schema for tableName={}", datasetName);
ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName); TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId()); return getSchemaForId(typeIdPair.left, typeIdPair.right);
} }
private String getSchemaForId(String typeName, String guid) throws DiscoveryException, SchemaNotFoundException { private String getSchemaForId(String typeName, String guid) throws DiscoveryException, SchemaNotFoundException {
...@@ -199,14 +198,16 @@ public class DataSetLineageService implements LineageService { ...@@ -199,14 +198,16 @@ public class DataSetLineageService implements LineageService {
* *
* @param datasetName table name * @param datasetName table name
*/ */
private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException { private TypeUtils.Pair<String, String> validateDatasetNameExists(String datasetName) throws AtlasException {
final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName); Iterator<AtlasVertex> results = graph.query().has("Referenceable.qualifiedName", datasetName)
GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery, new QueryParams(1, 0)); .has(Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name())
if (!(queryResult.rows().length() > 0)) { .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
throw new EntityNotFoundException(datasetName + " does not exist"); .vertices().iterator();
while (results.hasNext()) {
AtlasVertex vertex = results.next();
return TypeUtils.Pair.of(GraphHelper.getTypeName(vertex), GraphHelper.getIdFromVertex(vertex));
} }
throw new EntityNotFoundException("Dataset with name = " + datasetName + " does not exist");
return (ReferenceableInstance)queryResult.rows().apply(0);
} }
/** /**
...@@ -215,13 +216,13 @@ public class DataSetLineageService implements LineageService { ...@@ -215,13 +216,13 @@ public class DataSetLineageService implements LineageService {
* @param guid entity id * @param guid entity id
*/ */
private String validateDatasetExists(String guid) throws AtlasException { private String validateDatasetExists(String guid) throws AtlasException {
final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid); Iterator<AtlasVertex> results = graph.query().has(Constants.GUID_PROPERTY_KEY, guid)
GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery, new QueryParams(1, 0)); .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
if (!(queryResult.rows().length() > 0)) { .vertices().iterator();
throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist"); while (results.hasNext()) {
AtlasVertex vertex = results.next();
return GraphHelper.getTypeName(vertex);
} }
throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
return referenceable.getTypeName();
} }
} }
...@@ -17,12 +17,9 @@ ...@@ -17,12 +17,9 @@
*/ */
package org.apache.atlas; package org.apache.atlas;
import java.util.ArrayList; import com.google.common.base.Preconditions;
import java.util.Date; import com.google.common.collect.ImmutableList;
import java.util.List; import com.google.common.collect.ImmutableSet;
import javax.inject.Inject;
import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
...@@ -45,9 +42,10 @@ import org.apache.atlas.typesystem.types.TypeSystem; ...@@ -45,9 +42,10 @@ import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
import com.google.common.base.Preconditions; import javax.inject.Inject;
import com.google.common.collect.ImmutableList; import java.util.ArrayList;
import com.google.common.collect.ImmutableSet; import java.util.Date;
import java.util.List;
/** /**
* Base Class to set up hive types and instances for tests * Base Class to set up hive types and instances for tests
...@@ -319,7 +317,7 @@ public class BaseRepositoryTest { ...@@ -319,7 +317,7 @@ public class BaseRepositoryTest {
List<Referenceable> columns, String... traitNames) throws Exception { List<Referenceable> columns, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
referenceable.set("name", name); referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "qualified:" + name);
referenceable.set("description", description); referenceable.set("description", description);
referenceable.set("owner", owner); referenceable.set("owner", owner);
referenceable.set("tableType", tableType); referenceable.set("tableType", tableType);
......
...@@ -34,6 +34,7 @@ import org.apache.atlas.typesystem.persistence.Id; ...@@ -34,6 +34,7 @@ import org.apache.atlas.typesystem.persistence.Id;
import org.apache.commons.collections.ArrayStack; import org.apache.commons.collections.ArrayStack;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
...@@ -156,14 +157,14 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { ...@@ -156,14 +157,14 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
testInvalidArguments(expectedException, new Invoker() { testInvalidArguments(expectedException, new Invoker() {
@Override @Override
void run() throws AtlasException { void run() throws AtlasException {
lineageService.getInputsGraphForEntity(tableName); lineageService.getInputsGraph(tableName);
} }
}); });
} }
@Test @Test
public void testGetInputsGraph() throws Exception { public void testGetInputsGraph() throws Exception {
JSONObject results = new JSONObject(lineageService.getInputsGraph("sales_fact_monthly_mv")); JSONObject results = getInputsGraph("sales_fact_monthly_mv");
assertNotNull(results); assertNotNull(results);
System.out.println("inputs graph = " + results); System.out.println("inputs graph = " + results);
...@@ -179,7 +180,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { ...@@ -179,7 +180,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
@Test @Test
public void testCircularLineage() throws Exception{ public void testCircularLineage() throws Exception{
JSONObject results = new JSONObject(lineageService.getInputsGraph("table2")); JSONObject results = getInputsGraph("table2");
assertNotNull(results); assertNotNull(results);
System.out.println("inputs graph = " + results); System.out.println("inputs graph = " + results);
...@@ -223,19 +224,19 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { ...@@ -223,19 +224,19 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
} }
@Test(dataProvider = "invalidArgumentsProvider") @Test(dataProvider = "invalidArgumentsProvider")
public void testGetOutputsGraphForEntityInvalidArguments(final String tableName, String expectedException) public void testGetOutputsGraphForEntityInvalidArguments(final String tableId, String expectedException)
throws Exception { throws Exception {
testInvalidArguments(expectedException, new Invoker() { testInvalidArguments(expectedException, new Invoker() {
@Override @Override
void run() throws AtlasException { void run() throws AtlasException {
lineageService.getOutputsGraphForEntity(tableName); lineageService.getOutputsGraphForEntity(tableId);
} }
}); });
} }
@Test @Test
public void testGetOutputsGraph() throws Exception { public void testGetOutputsGraph() throws Exception {
JSONObject results = new JSONObject(lineageService.getOutputsGraph("sales_fact")); JSONObject results = getOutputsGraph("sales_fact");
assertNotNull(results); assertNotNull(results);
System.out.println("outputs graph = " + results); System.out.println("outputs graph = " + results);
...@@ -276,7 +277,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { ...@@ -276,7 +277,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
@Test(dataProvider = "tableNamesProvider") @Test(dataProvider = "tableNamesProvider")
public void testGetSchema(String tableName, String expected) throws Exception { public void testGetSchema(String tableName, String expected) throws Exception {
JSONObject results = new JSONObject(lineageService.getSchema(tableName)); JSONObject results = getSchema(tableName);
assertNotNull(results); assertNotNull(results);
System.out.println("columns = " + results); System.out.println("columns = " + results);
...@@ -284,11 +285,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { ...@@ -284,11 +285,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
Assert.assertEquals(rows.length(), Integer.parseInt(expected)); Assert.assertEquals(rows.length(), Integer.parseInt(expected));
for (int index = 0; index < rows.length(); index++) { for (int index = 0; index < rows.length(); index++) {
final JSONObject row = rows.getJSONObject(index); assertColumn(rows.getJSONObject(index));
assertNotNull(row.getString("name"));
assertNotNull(row.getString("comment"));
assertNotNull(row.getString("dataType"));
Assert.assertEquals(row.getString("$typeName$"), "hive_column");
} }
} }
...@@ -305,12 +302,15 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { ...@@ -305,12 +302,15 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
Assert.assertEquals(rows.length(), Integer.parseInt(expected)); Assert.assertEquals(rows.length(), Integer.parseInt(expected));
for (int index = 0; index < rows.length(); index++) { for (int index = 0; index < rows.length(); index++) {
final JSONObject row = rows.getJSONObject(index); assertColumn(rows.getJSONObject(index));
assertNotNull(row.getString("name")); }
assertNotNull(row.getString("comment"));
assertNotNull(row.getString("dataType"));
Assert.assertEquals(row.getString("$typeName$"), "hive_column");
} }
private void assertColumn(JSONObject jsonObject) throws JSONException {
assertNotNull(jsonObject.getString("name"));
assertNotNull(jsonObject.getString("comment"));
assertNotNull(jsonObject.getString("dataType"));
Assert.assertEquals(jsonObject.getString("$typeName$"), "hive_column");
} }
@Test(expectedExceptions = SchemaNotFoundException.class) @Test(expectedExceptions = SchemaNotFoundException.class)
...@@ -359,23 +359,35 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { ...@@ -359,23 +359,35 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
}); });
} }
private JSONObject getSchema(String tableName) throws Exception {
return new JSONObject(lineageService.getSchema("qualified:" + tableName));
}
private JSONObject getInputsGraph(String tableName) throws Exception {
return new JSONObject(lineageService.getInputsGraph("qualified:" + tableName));
}
private JSONObject getOutputsGraph(String tableName) throws Exception {
return new JSONObject(lineageService.getOutputsGraph("qualified:" + tableName));
}
@Test @Test
public void testLineageWithDelete() throws Exception { public void testLineageWithDelete() throws Exception {
String tableName = "table" + random(); String tableName = "table" + random();
createTable(tableName, 3, true); createTable(tableName, 3, true);
String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName); String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
JSONObject results = new JSONObject(lineageService.getSchema(tableName)); JSONObject results = getSchema(tableName);
assertEquals(results.getJSONArray("rows").length(), 3); assertEquals(results.getJSONArray("rows").length(), 3);
results = new JSONObject(lineageService.getInputsGraph(tableName)); results = getInputsGraph(tableName);
Struct resultInstance = InstanceSerialization.fromJsonStruct(results.toString(), true); Struct resultInstance = InstanceSerialization.fromJsonStruct(results.toString(), true);
Map<String, Struct> vertices = (Map) resultInstance.get("vertices"); Map<String, Struct> vertices = (Map) resultInstance.get("vertices");
assertEquals(vertices.size(), 2); assertEquals(vertices.size(), 2);
Struct vertex = vertices.get(tableId); Struct vertex = vertices.get(tableId);
assertEquals(((Struct) vertex.get("vertexId")).get("state"), Id.EntityState.ACTIVE.name()); assertEquals(((Struct) vertex.get("vertexId")).get("state"), Id.EntityState.ACTIVE.name());
results = new JSONObject(lineageService.getOutputsGraph(tableName)); results = getOutputsGraph(tableName);
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
results = new JSONObject(lineageService.getSchemaForEntity(tableId)); results = new JSONObject(lineageService.getSchemaForEntity(tableId));
...@@ -408,21 +420,21 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { ...@@ -408,21 +420,21 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2);
try { try {
lineageService.getSchema(tableName); getSchema(tableName);
fail("Expected EntityNotFoundException"); fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) { } catch (EntityNotFoundException e) {
//expected //expected
} }
try { try {
lineageService.getInputsGraph(tableName); getInputsGraph(tableName);
fail("Expected EntityNotFoundException"); fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) { } catch (EntityNotFoundException e) {
//expected //expected
} }
try { try {
lineageService.getOutputsGraph(tableName); getOutputsGraph(tableName);
fail("Expected EntityNotFoundException"); fail("Expected EntityNotFoundException");
} catch (EntityNotFoundException e) { } catch (EntityNotFoundException e) {
//expected //expected
...@@ -430,13 +442,13 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { ...@@ -430,13 +442,13 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
//Create table again should show new lineage //Create table again should show new lineage
createTable(tableName, 2, false); createTable(tableName, 2, false);
results = new JSONObject(lineageService.getSchema(tableName)); results = getSchema(tableName);
assertEquals(results.getJSONArray("rows").length(), 2); assertEquals(results.getJSONArray("rows").length(), 2);
results = new JSONObject(lineageService.getOutputsGraph(tableName)); results = getOutputsGraph(tableName);
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
results = new JSONObject(lineageService.getInputsGraph(tableName)); results = getInputsGraph(tableName);
assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0);
tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName); tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment