Commit 4353aa91 by Sarath Subramanian

ATLAS-2345: v1 schema API response includes partition keys in addition to columns

parent 3f330194
......@@ -25,6 +25,7 @@ import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageRelation;
......@@ -46,6 +47,7 @@ import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
......@@ -58,6 +60,7 @@ import java.util.stream.Collectors;
public class EntityLineageService implements AtlasLineageService {
private static final String INPUT_PROCESS_EDGE = "__Process.inputs";
private static final String OUTPUT_PROCESS_EDGE = "__Process.outputs";
private static final String COLUMNS = "columns";
private final AtlasGraph graph;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
......@@ -127,22 +130,39 @@ public class EntityLineageService implements AtlasLineageService {
ret.setDataType(AtlasTypeUtil.toClassTypeDefinition(hive_column));
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityRetriever.toAtlasEntityWithExtInfo(guid);
AtlasEntity entity = entityWithExtInfo.getEntity();
Map<String, AtlasEntity> referredEntities = entityWithExtInfo.getReferredEntities();
List<String> columnIds = getColumnIds(entity);
if (MapUtils.isNotEmpty(referredEntities)) {
List<Map<String, Object>> rows = referredEntities.entrySet()
.stream()
.filter(EntityLineageService::isHiveColumn)
.map(e -> AtlasTypeUtil.toMap(e.getValue()))
.collect(Collectors.toList());
.stream()
.filter(e -> isColumn(columnIds, e))
.map(e -> AtlasTypeUtil.toMap(e.getValue()))
.collect(Collectors.toList());
ret.setRows(rows);
}
return ret;
}
private static boolean isHiveColumn(Map.Entry<String, AtlasEntity> e) {
return StringUtils.equals("hive_column", e.getValue().getTypeName());
private List<String> getColumnIds(AtlasEntity entity) {
List<String> ret = new ArrayList<>();
Object columnObjs = entity.getAttribute(COLUMNS);
if (columnObjs != null && columnObjs instanceof List) {
for (Object pkObj : (List) columnObjs) {
if (pkObj instanceof AtlasObjectId) {
ret.add(((AtlasObjectId) pkObj).getGuid());
}
}
}
return ret;
}
private boolean isColumn(List<String> columnIds, Map.Entry<String, AtlasEntity> e) {
return columnIds.contains(e.getValue().getGuid());
}
private AtlasLineageInfo getLineageInfo(String guid, LineageDirection direction, int depth) throws AtlasBaseException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment