Commit eec1201c by Sarath Subramanian Committed by Vimal Sharma

LineageResource API needs to map to the new LineageREST API

parent 7a1b8c15
...@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ...@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al
ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
ALL CHANGES: ALL CHANGES:
ATLAS-1300 LineageResource API needs to map to the new LineageREST API (sarath.kum4r@gmail.com via svimal2106)
ATLAS-1321 fixed HiveHookIT failures (ayubpathan via mneethiraj) ATLAS-1321 fixed HiveHookIT failures (ayubpathan via mneethiraj)
ATLAS-1336 fixed StormHookIT (ayubpathan via mneethiraj) ATLAS-1336 fixed StormHookIT (ayubpathan via mneethiraj)
ATLAS-1335 multi-value attribute handling in AtlasStructType to be consistent with TypeSystem for backward compatibility (mneethiraj) ATLAS-1335 multi-value attribute handling in AtlasStructType to be consistent with TypeSystem for backward compatibility (mneethiraj)
......
...@@ -22,10 +22,17 @@ import org.apache.atlas.AtlasClient; ...@@ -22,10 +22,17 @@ import org.apache.atlas.AtlasClient;
import org.apache.atlas.aspect.Monitored; import org.apache.atlas.aspect.Monitored;
import org.apache.atlas.discovery.DiscoveryException; import org.apache.atlas.discovery.DiscoveryException;
import org.apache.atlas.discovery.LineageService; import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageService;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.SchemaNotFoundException; import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.util.LineageUtils;
import org.apache.atlas.web.util.Servlets; import org.apache.atlas.web.util.Servlets;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -44,9 +51,10 @@ import javax.ws.rs.core.Response; ...@@ -44,9 +51,10 @@ import javax.ws.rs.core.Response;
@Singleton @Singleton
public class LineageResource { public class LineageResource {
private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class); private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.LineageResource");
private final LineageService lineageService; private final AtlasLineageService atlasLineageService;
private final LineageService lineageService;
private final AtlasTypeRegistry typeRegistry;
/** /**
* Created by the Guice ServletModule and injected with the * Created by the Guice ServletModule and injected with the
...@@ -55,8 +63,10 @@ public class LineageResource { ...@@ -55,8 +63,10 @@ public class LineageResource {
* @param lineageService lineage service handle * @param lineageService lineage service handle
*/ */
@Inject @Inject
public LineageResource(LineageService lineageService) { public LineageResource(LineageService lineageService, AtlasLineageService atlasLineageService, AtlasTypeRegistry typeRegistry) {
this.lineageService = lineageService; this.lineageService = lineageService;
this.atlasLineageService = atlasLineageService;
this.typeRegistry = typeRegistry;
} }
/** /**
...@@ -73,20 +83,15 @@ public class LineageResource { ...@@ -73,20 +83,15 @@ public class LineageResource {
LOG.info("Fetching lineage inputs graph for guid={}", guid); LOG.info("Fetching lineage inputs graph for guid={}", guid);
try { try {
final String jsonResult = lineageService.getInputsGraphForEntity(guid); AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, -1);
final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry);
JSONObject response = new JSONObject(); JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); response.put(AtlasClient.RESULTS, new JSONObject(result));
return Response.ok(response).build(); return Response.ok(response).build();
} catch (EntityNotFoundException e) { } catch (AtlasBaseException | JSONException e) {
LOG.error("entity not found for guid={}", guid);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (DiscoveryException | IllegalArgumentException e) {
LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e); LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
} }
...@@ -106,20 +111,15 @@ public class LineageResource { ...@@ -106,20 +111,15 @@ public class LineageResource {
LOG.info("Fetching lineage outputs graph for entity guid={}", guid); LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
try { try {
final String jsonResult = lineageService.getOutputsGraphForEntity(guid); AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.OUTPUT, -1);
final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry);
JSONObject response = new JSONObject(); JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); response.put(AtlasClient.RESULTS, new JSONObject(result));
return Response.ok(response).build(); return Response.ok(response).build();
} catch (EntityNotFoundException e) { } catch (AtlasBaseException | JSONException e) {
LOG.error("table entity not found for {}", guid);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND));
} catch (DiscoveryException | IllegalArgumentException e) {
LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST));
} catch (Throwable e) {
LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e); LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e);
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
} }
...@@ -160,4 +160,4 @@ public class LineageResource { ...@@ -160,4 +160,4 @@ public class LineageResource {
throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR));
} }
} }
} }
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.util;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.types.TypeSystem;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX;
public final class LineageUtils {
private LineageUtils() {}
private static final String VERTICES_ATTR_NAME = "vertices";
private static final String EDGES_ATTR_NAME = "edges";
private static final String VERTEX_ID_ATTR_NAME = "vertexId";
private static final String TEMP_STRUCT_ID_RESULT = "__IdType";
private static final AtomicInteger COUNTER = new AtomicInteger();
public static String toLineageStruct(AtlasLineageInfo lineageInfo, AtlasTypeRegistry registry) throws AtlasBaseException {
String ret = null;
if (lineageInfo != null) {
Map<String, AtlasEntityHeader> entities = lineageInfo.getGuidEntityMap();
Set<AtlasLineageInfo.LineageRelation> relations = lineageInfo.getRelations();
AtlasLineageInfo.LineageDirection direction = lineageInfo.getLineageDirection();
Map<String, Struct> verticesMap = new HashMap<>();
// Lineage Entities mapping -> verticesMap (vertices)
for (String guid : entities.keySet()) {
AtlasEntityHeader entityHeader = entities.get(guid);
if (isDataSet(entityHeader.getTypeName(), registry)) {
Map<String, Object> vertexIdMap = new HashMap<>();
TypeSystem.IdType idType = TypeSystem.getInstance().getIdType();
vertexIdMap.put(idType.idAttrName(), guid);
vertexIdMap.put(idType.stateAttrName(), (entityHeader.getStatus() == AtlasEntity.Status.STATUS_ACTIVE) ? "ACTIVE" : "DELETED");
vertexIdMap.put(idType.typeNameAttrName(), entityHeader.getTypeName());
Map<String, Object> values = new HashMap<>();
values.put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, entityHeader.getDisplayText());
values.put(VERTEX_ID_ATTR_NAME, constructResultStruct(vertexIdMap, true));
values.put(AtlasClient.NAME, entityHeader.getDisplayText());
verticesMap.put(guid, constructResultStruct(values, false));
}
}
// Lineage Relations mapping -> edgesMap (edges)
Map<String, List<String>> edgesMap = new HashMap<>();
for (AtlasLineageInfo.LineageRelation relation : relations) {
String fromEntityId = relation.getFromEntityId();
String toEntityId = relation.getToEntityId();
if (direction == AtlasLineageInfo.LineageDirection.INPUT) {
if (!edgesMap.containsKey(toEntityId)) {
edgesMap.put(toEntityId, new ArrayList<String>());
}
edgesMap.get(toEntityId).add(fromEntityId);
} else if (direction == AtlasLineageInfo.LineageDirection.OUTPUT) {
if (!edgesMap.containsKey(fromEntityId)) {
edgesMap.put(fromEntityId, new ArrayList<String>());
}
edgesMap.get(fromEntityId).add(toEntityId);
}
}
Map<String, Object> map = new HashMap<>();
map.put(VERTICES_ATTR_NAME, verticesMap);
map.put(EDGES_ATTR_NAME, edgesMap);
ret = InstanceSerialization.toJson(constructResultStruct(map, false), false);
}
return ret;
}
private static Struct constructResultStruct(Map<String, Object> values, boolean idType) {
if (idType) {
return new Struct(TEMP_STRUCT_ID_RESULT, values);
}
return new Struct(org.apache.atlas.query.TypeUtils.TEMP_STRUCT_NAME_PREFIX() + COUNTER.getAndIncrement(), values);
}
private static boolean isDataSet(String typeName, AtlasTypeRegistry registry) throws AtlasBaseException {
boolean ret = false;
AtlasType type = registry.getType(typeName);
if (type instanceof AtlasEntityType) {
AtlasEntityType entityType = (AtlasEntityType) type;
ret = entityType.getAllSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_DATASET);
}
return ret;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment