Commit ea6c3cb5 by Sarath Subramanian Committed by Madhan Neethiraj

ATLAS-1234: Lineage REST API - v2

parent 2119666f
......@@ -86,6 +86,9 @@ public final class Constants {
public static final String FULLTEXT_INDEX = "fulltext_index";
public static final String QUALIFIED_NAME = "Referenceable.qualifiedName";
public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName";
private Constants() {
}
......
......@@ -46,6 +46,9 @@ public enum AtlasErrorCode {
TYPE_NAME_NOT_FOUND(404, "ATLAS4041E", "Given typename {0} was invalid"),
TYPE_GUID_NOT_FOUND(404, "ATLAS4042E", "Given type guid {0} was invalid"),
EMPTY_RESULTS(404, "ATLAS4044E", "No result found for {0}"),
INSTANCE_GUID_NOT_FOUND(404, "ATLAS4045E", "Given instance guid {0} is invalid"),
INSTANCE_LINEAGE_INVALID_PARAMS(404, "ATLAS4046E", "Invalid lineage query parameters passed {0}: {1}"),
INSTANCE_LINEAGE_QUERY_FAILED(404, "ATLAS4047E", "Instance lineage query failed {0}"),
TYPE_ALREADY_EXISTS(409, "ATLAS4091E", "Given type {0} already exists"),
TYPE_HAS_REFERENCES(409, "ATLAS4092E", "Given type {0} has references"),
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.lineage;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE;
import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class AtlasLineageInfo implements Serializable {
private String baseEntityGuid;
private LineageDirection lineageDirection;
private int lineageDepth;
private Map<String, AtlasEntityHeader> guidEntityMap;
private Set<LineageRelation> relations;
public AtlasLineageInfo() {}
public enum LineageDirection { INPUT, OUTPUT, BOTH }
/**
* Captures lineage information for an entity instance like hive_table
* @param baseEntityGuid guid of the lineage entity .
* @param lineageDirection direction of lineage, can be INPUT, OUTPUT or INPUT_AND_OUTPUT
* @param lineageDepth lineage depth to be fetched.
* @param guidEntityMap map of entity guid to AtlasEntityHeader (minimal entity info)
* @param relations list of lineage relations for the entity (fromEntityId -> toEntityId)
*/
public AtlasLineageInfo(String baseEntityGuid, Map<String, AtlasEntityHeader> guidEntityMap,
Set<LineageRelation> relations, LineageDirection lineageDirection, int lineageDepth) {
this.baseEntityGuid = baseEntityGuid;
this.lineageDirection = lineageDirection;
this.lineageDepth = lineageDepth;
this.guidEntityMap = guidEntityMap;
this.relations = relations;
}
public String getBaseEntityGuid() {
return baseEntityGuid;
}
public void setBaseEntityGuid(String baseEntityGuid) {
this.baseEntityGuid = baseEntityGuid;
}
public Map<String, AtlasEntityHeader> getGuidEntityMap() {
return guidEntityMap;
}
public void setGuidEntityMap(Map<String, AtlasEntityHeader> guidEntityMap) {
this.guidEntityMap = guidEntityMap;
}
public Set<LineageRelation> getRelations() {
return relations;
}
public void setRelations(Set<LineageRelation> relations) {
this.relations = relations;
}
public LineageDirection getLineageDirection() {
return lineageDirection;
}
public void setLineageDirection(LineageDirection lineageDirection) {
this.lineageDirection = lineageDirection;
}
public int getLineageDepth() {
return lineageDepth;
}
public void setLineageDepth(int lineageDepth) {
this.lineageDepth = lineageDepth;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AtlasLineageInfo that = (AtlasLineageInfo) o;
if (baseEntityGuid != null ? !baseEntityGuid.equals(that.baseEntityGuid) : that.baseEntityGuid != null) return false;
if (lineageDepth != that.lineageDepth) return false;
if (guidEntityMap != null ? !guidEntityMap.equals(that.guidEntityMap) : that.guidEntityMap != null) return false;
if (relations != null ? !relations.equals(that.relations) : that.relations != null) return false;
return lineageDirection == that.lineageDirection;
}
@Override
public int hashCode() {
int result = guidEntityMap != null ? guidEntityMap.hashCode() : 0;
result = 31 * result + (relations != null ? relations.hashCode() : 0);
result = 31 * result + (lineageDirection != null ? lineageDirection.hashCode() : 0);
result = 31 * result + lineageDepth;
result = 31 * result + (baseEntityGuid != null ? baseEntityGuid.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "AtlasLineageInfo{" +
"baseEntityGuid=" + baseEntityGuid +
", guidEntityMap=" + guidEntityMap +
", relations=" + relations +
", lineageDirection=" + lineageDirection +
", lineageDepth=" + lineageDepth +
'}';
}
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public static class LineageRelation {
private String fromEntityId;
private String toEntityId;
public LineageRelation() { }
public LineageRelation(String fromEntityId, String toEntityId) {
this.fromEntityId = fromEntityId;
this.toEntityId = toEntityId;
}
public String getFromEntityId() {
return fromEntityId;
}
public void setFromEntityId(String fromEntityId) {
this.fromEntityId = fromEntityId;
}
public String getToEntityId() {
return toEntityId;
}
public void setToEntityId(String toEntityId) {
this.toEntityId = toEntityId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LineageRelation that = (LineageRelation) o;
if (fromEntityId != null ? !fromEntityId.equals(that.fromEntityId) : that.fromEntityId != null)
return false;
return toEntityId != null ? toEntityId.equals(that.toEntityId) : that.toEntityId == null;
}
@Override
public int hashCode() {
int result = fromEntityId != null ? fromEntityId.hashCode() : 0;
result = 31 * result + (toEntityId != null ? toEntityId.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "LineageRelation{" +
"fromEntityId='" + fromEntityId + '\'' +
", toEntityId='" + toEntityId + '\'' +
'}';
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.lineage;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
public interface AtlasLineageService {
/**
* @param entityGuid unique ID of the entity
* @param direction direction of lineage - INPUT, OUTPUT or BOTH
* @param depth number of hops in lineage
* @return AtlasLineageInfo
*/
AtlasLineageInfo getAtlasLineageInfo(String entityGuid, LineageDirection direction, int depth) throws AtlasBaseException;
}
......@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES:
ATLAS-1234 Lineage REST API - v2 (sarath.kum4r@gmail.com via mneethiraj)
ATLAS-1276 fix for webapp test failures (ayubkhan via mneethiraj)
ATLAS-1278 Added API to get typedef header info (apoorvnaik via mneethiraj)
ATLAS-1192 Atlas IE support (kevalbhatt)
......
......@@ -26,11 +26,13 @@ import com.google.inject.multibindings.Multibinder;
import org.aopalliance.intercept.MethodInterceptor;
import org.apache.atlas.discovery.DataSetLineageService;
import org.apache.atlas.discovery.DiscoveryService;
import org.apache.atlas.discovery.EntityLineageService;
import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.model.lineage.AtlasLineageService;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.audit.EntityAuditListener;
import org.apache.atlas.repository.audit.EntityAuditRepository;
......@@ -94,6 +96,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton();
bind(AtlasLineageService.class).to(EntityLineageService.class).asEagerSingleton();
Configuration configuration = getConfiguration();
bindAuditRepository(binder(), configuration);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageService;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.commons.collections.CollectionUtils;
import javax.inject.Inject;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class EntityLineageService implements AtlasLineageService {
private static final String INPUT_PROCESS_EDGE = "__Process.inputs";
private static final String OUTPUT_PROCESS_EDGE = "__Process.outputs";
private final AtlasGraph graph;
/**
* Gremlin query to retrieve input/output lineage for specified depth on a DataSet entity.
* return list of Atlas vertices paths.
*/
private static final String PARTIAL_LINEAGE_QUERY = "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
"loop('src', {it.loops <= %s}, {((it.object.'__superTypeNames') ? " +
"(it.object.'__superTypeNames'.contains('DataSet')) : false)})." +
"path().toList()";
/**
* Gremlin query to retrieve all (no fixed depth) input/output lineage for a DataSet entity.
* return list of Atlas vertices paths.
*/
private static final String FULL_LINEAGE_QUERY = "g.V('__guid', '%s').as('src').in('%s').out('%s')." +
"loop('src', {((it.path.contains(it.object)) ? false : true)}, " +
"{((it.object.'__superTypeNames') ? " +
"(it.object.'__superTypeNames'.contains('DataSet')) : false)})." +
"path().toList()";
@Inject
EntityLineageService() throws DiscoveryException {
this.graph = AtlasGraphProvider.getGraphInstance();
}
@Override
public AtlasLineageInfo getAtlasLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException {
AtlasLineageInfo lineageInfo;
if (!entityExists(guid)) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
}
if (direction != null) {
if (direction.equals(LineageDirection.INPUT)) {
lineageInfo = getLineageInfo(guid, LineageDirection.INPUT, depth);
} else if (direction.equals(LineageDirection.OUTPUT)) {
lineageInfo = getLineageInfo(guid, LineageDirection.OUTPUT, depth);
} else if (direction.equals(LineageDirection.BOTH)) {
lineageInfo = getBothLineageInfo(guid, depth);
} else {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", direction.toString());
}
} else {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS, "direction", null);
}
return lineageInfo;
}
private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException {
Map<String, AtlasEntityHeader> entities = new HashMap<String, AtlasEntityHeader>();
Set<LineageRelation> relations = new HashSet<LineageRelation>();
String lineageQuery = getLineageQuery(guid, direction, depth);
try {
List paths = (List) graph.executeGremlinScript(lineageQuery, true);
if (CollectionUtils.isNotEmpty(paths)) {
for (Object path : paths) {
if (path instanceof List) {
List vertices = (List) path;
if (CollectionUtils.isNotEmpty(vertices)) {
AtlasEntityHeader prev = null;
for (Object vertex : vertices) {
AtlasEntityHeader entity = toAtlasEntityHeader(vertex);
if (!entities.containsKey(entity.getGuid())) {
entities.put(entity.getGuid(), entity);
}
if (prev != null) {
if (direction.equals(LineageDirection.INPUT)) {
relations.add(new LineageRelation(entity.getGuid(), prev.getGuid()));
} else if (direction.equals(LineageDirection.OUTPUT)) {
relations.add(new LineageRelation(prev.getGuid(), entity.getGuid()));
}
}
prev = entity;
}
}
}
}
}
} catch (ScriptException e) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED, lineageQuery);
}
return new AtlasLineageInfo(guid, entities, relations, direction, depth);
}
private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws AtlasBaseException {
AtlasLineageInfo inputLineage = getLineageInfo(guid, LineageDirection.INPUT, depth);
AtlasLineageInfo outputLineage = getLineageInfo(guid, LineageDirection.OUTPUT, depth);
AtlasLineageInfo ret = inputLineage;
ret.getRelations().addAll(outputLineage.getRelations());
ret.getGuidEntityMap().putAll(outputLineage.getGuidEntityMap());
ret.setLineageDirection(LineageDirection.BOTH);
return ret;
}
private String getLineageQuery(String entityGuid, LineageDirection direction, int depth) throws AtlasBaseException {
String lineageQuery = null;
if (direction.equals(LineageDirection.INPUT)) {
if (depth < 1) {
lineageQuery = String.format(FULL_LINEAGE_QUERY, entityGuid, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE);
} else {
lineageQuery = String.format(PARTIAL_LINEAGE_QUERY, entityGuid, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE, depth);
}
} else if (direction.equals(LineageDirection.OUTPUT)) {
if (depth < 1) {
lineageQuery = String.format(FULL_LINEAGE_QUERY, entityGuid, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE);
} else {
lineageQuery = String.format(PARTIAL_LINEAGE_QUERY, entityGuid, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE, depth);
}
}
return lineageQuery;
}
private AtlasEntityHeader toAtlasEntityHeader(Object vertexObj) {
AtlasEntityHeader ret = new AtlasEntityHeader();
if (vertexObj instanceof AtlasVertex) {
AtlasVertex vertex = (AtlasVertex) vertexObj;
ret.setTypeName(vertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class));
ret.setGuid(vertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class));
ret.setDisplayText(vertex.getProperty(Constants.QUALIFIED_NAME, String.class));
String state = vertex.getProperty(Constants.STATE_PROPERTY_KEY, String.class);
Status status = (state.equalsIgnoreCase("ACTIVE") ? Status.STATUS_ACTIVE : Status.STATUS_DELETED);
ret.setStatus(status);
}
return ret;
}
private boolean entityExists(String guid) {
boolean ret = false;
Iterator<AtlasVertex> results = graph.query()
.has(Constants.GUID_PROPERTY_KEY, guid)
.has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE)
.vertices().iterator();
while (results.hasNext()) {
return true;
}
return ret;
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.lineage;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.BaseRepositoryTest;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.discovery.EntityLineageService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.commons.collections.ArrayStack;
import org.apache.commons.lang.RandomStringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.testng.Assert.*;
/**
* Unit tests for the new v2 Instance LineageService.
*/
@Guice(modules = RepositoryMetadataModule.class)
public class EntityLineageServiceTest extends BaseRepositoryTest {
@Inject
private EntityLineageService lineageService;
@BeforeClass
public void setUp() throws Exception {
super.setUp();
}
@AfterClass
public void tearDown() throws Exception {
super.tearDown();
}
/**
* Circular Lineage Test.
*/
@Test
public void testCircularLineage() throws Exception{
String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "table2");
AtlasLineageInfo circularLineage = getInputLineageInfo(entityGuid, 5);
assertNotNull(circularLineage);
System.out.println("circular lineage = " + circularLineage);
Map<String, AtlasEntityHeader> entities = circularLineage.getGuidEntityMap();
assertNotNull(entities);
Set<LineageRelation> relations = circularLineage.getRelations();
assertNotNull(relations);
Assert.assertEquals(entities.size(), 4);
Assert.assertEquals(relations.size(), 4);
Assert.assertEquals(circularLineage.getLineageDepth(), 5);
Assert.assertEquals(circularLineage.getLineageDirection(), LineageDirection.INPUT);
assertTrue(entities.containsKey(circularLineage.getBaseEntityGuid()));
}
/**
* Input Lineage Tests.
*/
@Test(dataProvider = "invalidQueryParamsProvider")
public void testGetInputLineageInfoInvalidParams(final String guid, final AtlasLineageInfo.LineageDirection direction, final int depth, AtlasErrorCode errorCode) throws Exception {
testInvalidQueryParams(errorCode, new Invoker() {
@Override
void run() throws AtlasBaseException {
lineageService.getAtlasLineageInfo(guid, direction, depth);
}
});
}
@Test
public void testGetInputLineageInfo() throws Exception {
String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv");
AtlasLineageInfo inputLineage = getInputLineageInfo(entityGuid, 4);
assertNotNull(inputLineage);
System.out.println("input lineage = " + inputLineage);
Map<String, AtlasEntityHeader> entities = inputLineage.getGuidEntityMap();
assertNotNull(entities);
Set<LineageRelation> relations = inputLineage.getRelations();
assertNotNull(relations);
Assert.assertEquals(entities.size(), 6);
Assert.assertEquals(relations.size(), 5);
Assert.assertEquals(inputLineage.getLineageDepth(), 4);
Assert.assertEquals(inputLineage.getLineageDirection(), LineageDirection.INPUT);
assertTrue(entities.containsKey(inputLineage.getBaseEntityGuid()));
}
/**
* Output Lineage Tests.
*/
@Test(dataProvider = "invalidQueryParamsProvider")
public void testGetOutputLineageInvalidParams(final String guid, final LineageDirection direction, final int depth, AtlasErrorCode errorCode) throws Exception {
testInvalidQueryParams(errorCode, new Invoker() {
@Override
void run() throws AtlasBaseException {
lineageService.getAtlasLineageInfo(guid, direction, depth);
}
});
}
@Test
public void testGetOutputLineageInfo() throws Exception {
String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact");
AtlasLineageInfo outputLineage = getOutputLineageInfo(entityGuid, 4);
assertNotNull(outputLineage);
System.out.println("output lineage = " + outputLineage);
Map<String, AtlasEntityHeader> entities = outputLineage.getGuidEntityMap();
assertNotNull(entities);
Set<LineageRelation> relations = outputLineage.getRelations();
assertNotNull(relations);
Assert.assertEquals(entities.size(), 5);
Assert.assertEquals(relations.size(), 4);
Assert.assertEquals(outputLineage.getLineageDepth(), 4);
Assert.assertEquals(outputLineage.getLineageDirection(), LineageDirection.OUTPUT);
assertTrue(entities.containsKey(outputLineage.getBaseEntityGuid()));
}
/**
* Both Lineage Tests.
*/
@Test(dataProvider = "invalidQueryParamsProvider")
public void testGetLineageInfoInvalidParams(final String guid, final LineageDirection direction, final int depth, AtlasErrorCode errorCode) throws Exception {
testInvalidQueryParams(errorCode, new Invoker() {
@Override
void run() throws AtlasBaseException {
lineageService.getAtlasLineageInfo(guid, direction, depth);
}
});
}
@Test
public void testGetLineageInfo() throws Exception {
String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv");
AtlasLineageInfo bothLineage = getBothLineageInfo(entityGuid, 5);
assertNotNull(bothLineage);
System.out.println("both lineage = " + bothLineage);
Map<String, AtlasEntityHeader> entities = bothLineage.getGuidEntityMap();
assertNotNull(entities);
Set<LineageRelation> relations = bothLineage.getRelations();
assertNotNull(relations);
Assert.assertEquals(entities.size(), 6);
Assert.assertEquals(relations.size(), 5);
Assert.assertEquals(bothLineage.getLineageDepth(), 5);
Assert.assertEquals(bothLineage.getLineageDirection(), AtlasLineageInfo.LineageDirection.BOTH);
assertTrue(entities.containsKey(bothLineage.getBaseEntityGuid()));
}
@DataProvider(name = "invalidQueryParamsProvider")
private Object[][] params() throws Exception {
String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", "sales_fact_monthly_mv");
// String guid, LineageDirection direction, int depth, AtlasErrorCode errorCode
return new Object[][]{
{"", null, 0, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND},
{" ", null, 0, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND},
{null, null, 0, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND},
{"invalidGuid", LineageDirection.OUTPUT, 6, AtlasErrorCode.INSTANCE_GUID_NOT_FOUND},
{entityGuid, null, -10, AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS},
{entityGuid, null, 5, AtlasErrorCode.INSTANCE_LINEAGE_INVALID_PARAMS}
};
}
abstract class Invoker {
abstract void run() throws AtlasBaseException;
}
public void testInvalidQueryParams(AtlasErrorCode expectedErrorCode, Invoker Invoker) throws Exception {
try {
Invoker.run();
fail("Expected " + expectedErrorCode.toString());
} catch(AtlasBaseException e) {
assertEquals(e.getAtlasErrorCode(), expectedErrorCode);
}
}
private AtlasLineageInfo getInputLineageInfo(String guid, int depth) throws Exception {
return lineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, depth);
}
private AtlasLineageInfo getOutputLineageInfo(String guid, int depth) throws Exception {
return lineageService.getAtlasLineageInfo(guid, AtlasLineageInfo.LineageDirection.OUTPUT, depth);
}
private AtlasLineageInfo getBothLineageInfo(String guid, int depth) throws Exception {
return lineageService.getAtlasLineageInfo(guid, AtlasLineageInfo.LineageDirection.BOTH, depth);
}
@Test
public void testNewLineageWithDelete() throws Exception {
String tableName = "table" + random();
createTable(tableName, 3, true);
String entityGuid = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
AtlasLineageInfo inputLineage = getInputLineageInfo(entityGuid, 5);
assertNotNull(inputLineage);
System.out.println("input lineage = " + inputLineage);
Map<String, AtlasEntityHeader> entitiesInput = inputLineage.getGuidEntityMap();
assertNotNull(entitiesInput);
assertEquals(entitiesInput.size(), 3);
Set<LineageRelation> relationsInput = inputLineage.getRelations();
assertNotNull(relationsInput);
assertEquals(relationsInput.size(), 2);
AtlasEntityHeader tableEntityInput = entitiesInput.get(entityGuid);
assertEquals(tableEntityInput.getStatus(), Status.STATUS_ACTIVE);
AtlasLineageInfo outputLineage = getOutputLineageInfo(entityGuid, 5);
assertNotNull(outputLineage);
System.out.println("output lineage = " + outputLineage);
Map<String, AtlasEntityHeader> entitiesOutput = outputLineage.getGuidEntityMap();
assertNotNull(entitiesOutput);
assertEquals(entitiesOutput.size(), 3);
Set<LineageRelation> relationsOutput = outputLineage.getRelations();
assertNotNull(relationsOutput);
assertEquals(relationsOutput.size(), 2);
AtlasEntityHeader tableEntityOutput = entitiesOutput.get(entityGuid);
assertEquals(tableEntityOutput.getStatus(), Status.STATUS_ACTIVE);
AtlasLineageInfo bothLineage = getBothLineageInfo(entityGuid, 5);
assertNotNull(bothLineage);
System.out.println("both lineage = " + bothLineage);
Map<String, AtlasEntityHeader> entitiesBoth = bothLineage.getGuidEntityMap();
assertNotNull(entitiesBoth);
assertEquals(entitiesBoth.size(), 5);
Set<LineageRelation> relationsBoth = bothLineage.getRelations();
assertNotNull(relationsBoth);
assertEquals(relationsBoth.size(), 4);
AtlasEntityHeader tableEntityBoth = entitiesBoth.get(entityGuid);
assertEquals(tableEntityBoth.getStatus(), Status.STATUS_ACTIVE);
//Delete the table entity. Lineage for entity returns the same results as before.
//Lineage for table name throws EntityNotFoundException
AtlasClient.EntityResult deleteResult = repository.deleteEntities(Arrays.asList(entityGuid));
assertTrue(deleteResult.getDeletedEntities().contains(entityGuid));
inputLineage = getInputLineageInfo(entityGuid, 5);
tableEntityInput = inputLineage.getGuidEntityMap().get(entityGuid);
assertEquals(tableEntityInput.getStatus(), Status.STATUS_DELETED);
assertEquals(inputLineage.getGuidEntityMap().size(), 3);
outputLineage = getOutputLineageInfo(entityGuid, 5);
tableEntityOutput = outputLineage.getGuidEntityMap().get(entityGuid);
assertEquals(tableEntityOutput.getStatus(), Status.STATUS_DELETED);
assertEquals(outputLineage.getGuidEntityMap().size(), 3);
bothLineage = getBothLineageInfo(entityGuid, 5);
tableEntityBoth = bothLineage.getGuidEntityMap().get(entityGuid);
assertEquals(tableEntityBoth.getStatus(), Status.STATUS_DELETED);
assertEquals(bothLineage.getGuidEntityMap().size(), 5);
}
private void createTable(String tableName, int numCols, boolean createLineage) throws Exception {
String dbId = getEntityId(DATABASE_TYPE, "name", "Sales");
Id salesDB = new Id(dbId, 0, DATABASE_TYPE);
//Create the entity again and schema should return the new schema
List<Referenceable> columns = new ArrayStack();
for (int i = 0; i < numCols; i++) {
columns.add(column("col" + random(), "int", "column descr"));
}
Referenceable sd =
storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true,
ImmutableList.of(column("time_id", "int", "time id")));
Id table = table(tableName, "test table", salesDB, sd, "fetl", "External", columns);
if (createLineage) {
Id inTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns);
Id outTable = table("table" + random(), "test table", salesDB, sd, "fetl", "External", columns);
loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(inTable),
ImmutableList.of(table), "create table as select ", "plan", "id", "graph", "ETL");
loadProcess("process" + random(), "hive query for monthly summary", "Tim ETL", ImmutableList.of(table),
ImmutableList.of(outTable), "create table as select ", "plan", "id", "graph", "ETL");
}
}
private String random() {
return RandomStringUtils.randomAlphanumeric(5);
}
private String getEntityId(String typeName, String attributeName, String attributeValue) throws Exception {
return repository.getEntityDefinition(typeName, attributeName, attributeValue).getId()._getId();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.rest;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageService;
import org.apache.atlas.web.util.Servlets;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
@Path("v2/lineage")
@Singleton
public class LineageREST {
private final AtlasLineageService atlasLineageService;
private static final String DEFAULT_DIRECTION = "BOTH";
private static final String DEFAULT_DEPTH = "3";
@Context
private HttpServletRequest httpServletRequest;
@Inject
public LineageREST(AtlasLineageService atlasLineageService) {
this.atlasLineageService = atlasLineageService;
}
/**
* Returns lineage info about entity.
* @param guid - unique entity id
* @param direction - input, output or both
* @param depth - number of hops for lineage
* @return AtlasLineageInfo
* @throws AtlasBaseException
*/
@GET
@Path("/{guid}")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public AtlasLineageInfo getLineageGraph(@PathParam("guid") String guid,
@QueryParam("direction") @DefaultValue(DEFAULT_DIRECTION) LineageDirection direction,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) throws AtlasBaseException {
AtlasLineageInfo ret = atlasLineageService.getAtlasLineageInfo(guid, direction, depth);
return ret;
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.web.util.Servlets;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Entity Lineage v2 Integration Tests.
*/
public class EntityLineageJerseyResourceIT extends BaseResourceIT {
private static final String BASE_URI = "api/atlas/v2/lineage/";
private static final String INPUT_DIRECTION = "INPUT";
private static final String OUTPUT_DIRECTION = "OUTPUT";
private static final String BOTH_DIRECTION = "BOTH";
private static final String DIRECTION_PARAM = "direction";
private static final String DEPTH_PARAM = "depth";
private String salesFactTable;
private String salesMonthlyTable;
private String salesDBName;
Gson gson = new Gson();
@BeforeClass
public void setUp() throws Exception {
super.setUp();
createTypeDefinitions();
setupInstances();
}
@Test
public void testInputLineageInfo() throws Exception {
String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE,
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesMonthlyTable).getId()._getId();
WebResource resource = service.path(BASE_URI).path(tableId).queryParam(DIRECTION_PARAM, INPUT_DIRECTION).
queryParam(DEPTH_PARAM, "5");
ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
.method(HttpMethod.GET, ClientResponse.class);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
System.out.println("input lineage info = " + responseAsString);
AtlasLineageInfo inputLineageInfo = gson.fromJson(responseAsString, AtlasLineageInfo.class);
Map<String, AtlasEntityHeader> entities = inputLineageInfo.getGuidEntityMap();
Assert.assertNotNull(entities);
Set<AtlasLineageInfo.LineageRelation> relations = inputLineageInfo.getRelations();
Assert.assertNotNull(relations);
Assert.assertEquals(entities.size(), 6);
Assert.assertEquals(relations.size(), 5);
Assert.assertEquals(inputLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.INPUT);
Assert.assertEquals(inputLineageInfo.getLineageDepth(), 5);
Assert.assertEquals(inputLineageInfo.getBaseEntityGuid(), tableId);
}
@Test
public void testOutputLineageInfo() throws Exception {
String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE,
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesFactTable).getId()._getId();
WebResource resource = service.path(BASE_URI).path(tableId).queryParam(DIRECTION_PARAM, OUTPUT_DIRECTION).
queryParam(DEPTH_PARAM, "5");
ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
.method(HttpMethod.GET, ClientResponse.class);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
System.out.println("output lineage info = " + responseAsString);
AtlasLineageInfo outputLineageInfo = gson.fromJson(responseAsString, AtlasLineageInfo.class);
Map<String, AtlasEntityHeader> entities = outputLineageInfo.getGuidEntityMap();
Assert.assertNotNull(entities);
Set<AtlasLineageInfo.LineageRelation> relations = outputLineageInfo.getRelations();
Assert.assertNotNull(relations);
Assert.assertEquals(entities.size(), 5);
Assert.assertEquals(relations.size(), 4);
Assert.assertEquals(outputLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.OUTPUT);
Assert.assertEquals(outputLineageInfo.getLineageDepth(), 5);
Assert.assertEquals(outputLineageInfo.getBaseEntityGuid(), tableId);
}
@Test
public void testLineageInfo() throws Exception {
String tableId = serviceClient.getEntity(HIVE_TABLE_TYPE,
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, salesMonthlyTable).getId()._getId();
WebResource resource = service.path(BASE_URI).path(tableId).queryParam(DIRECTION_PARAM, BOTH_DIRECTION).
queryParam(DEPTH_PARAM, "5");
ClientResponse clientResponse = resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
.method(HttpMethod.GET, ClientResponse.class);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
String responseAsString = clientResponse.getEntity(String.class);
Assert.assertNotNull(responseAsString);
System.out.println("both lineage info = " + responseAsString);
AtlasLineageInfo bothLineageInfo = gson.fromJson(responseAsString, AtlasLineageInfo.class);
Map<String, AtlasEntityHeader> entities = bothLineageInfo.getGuidEntityMap();
Assert.assertNotNull(entities);
Set<AtlasLineageInfo.LineageRelation> relations = bothLineageInfo.getRelations();
Assert.assertNotNull(relations);
Assert.assertEquals(entities.size(), 6);
Assert.assertEquals(relations.size(), 5);
Assert.assertEquals(bothLineageInfo.getLineageDirection(), AtlasLineageInfo.LineageDirection.BOTH);
Assert.assertEquals(bothLineageInfo.getLineageDepth(), 5);
Assert.assertEquals(bothLineageInfo.getBaseEntityGuid(), tableId);
}
private void setupInstances() throws Exception {
salesDBName = "Sales" + randomString();
Id salesDB = database(salesDBName, "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
List<Referenceable> salesFactColumns = ImmutableList
.of(column("time_id", "int", "time id"), column("product_id", "int", "product id"),
column("customer_id", "int", "customer id"),
column("sales", "double", "product id"));
salesFactTable = "sales_fact" + randomString();
Id salesFact = table(salesFactTable, "sales fact table", salesDB, "Joe", "MANAGED", salesFactColumns);
List<Referenceable> timeDimColumns = ImmutableList
.of(column("time_id", "int", "time id"), column("dayOfYear", "int", "day Of Year"),
column("weekDay", "int", "week Day"));
Id timeDim =
table("time_dim" + randomString(), "time dimension table", salesDB, "John Doe", "EXTERNAL",
timeDimColumns);
Id reportingDB =
database("Reporting" + randomString(), "reporting database", "Jane BI",
"hdfs://host:8000/apps/warehouse/reporting");
Id salesFactDaily =
table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB,
"Joe BI", "MANAGED", salesFactColumns);
loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph");
salesMonthlyTable = "sales_fact_monthly_mv" + randomString();
Id salesFactMonthly =
table(salesMonthlyTable, "sales fact monthly materialized view", reportingDB, "Jane BI",
"MANAGED", salesFactColumns);
loadProcess("loadSalesMonthly" + randomString(), "John ETL", ImmutableList.of(salesFactDaily),
ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph");
}
Id database(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());
return createInstance(referenceable);
}
Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("dataType", dataType);
referenceable.set("comment", comment);
return referenceable;
}
Id table(String name, String description, Id dbId, String owner, String tableType, List<Referenceable> columns,
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
referenceable.set("createTime", System.currentTimeMillis());
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", dbId);
referenceable.set("columns", columns);
return createInstance(referenceable);
}
Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText,
String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
referenceable.set("queryId", queryId);
referenceable.set("queryGraph", queryGraph);
return createInstance(referenceable);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment