Commit 5c002517 by Mandar Ambawane Committed by nixonrodrigues

ATLAS-3211 :- Update Hive hook with Relationship Attributes.

parent d1262e99
...@@ -129,17 +129,13 @@ public class AlterTableRename extends BaseHiveEvent { ...@@ -129,17 +129,13 @@ public class AlterTableRename extends BaseHiveEvent {
// update qualifiedName for all columns, partitionKeys, storageDesc // update qualifiedName for all columns, partitionKeys, storageDesc
String renamedTableQualifiedName = (String) renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME); String renamedTableQualifiedName = (String) renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_COLUMNS), oldTableEntity, renamedTableQualifiedName, ret); renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_COLUMNS), oldTableEntity, renamedTableQualifiedName, ret);
renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_PARTITION_KEYS), oldTableEntity, renamedTableQualifiedName, ret); renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS), oldTableEntity, renamedTableQualifiedName, ret);
renameStorageDesc(oldTableEntity, renamedTableEntity, ret); renameStorageDesc(oldTableEntity, renamedTableEntity, ret);
// remove columns, partitionKeys and storageDesc - as they have already been updated above
removeAttribute(renamedTableEntity, ATTRIBUTE_COLUMNS);
removeAttribute(renamedTableEntity, ATTRIBUTE_PARTITION_KEYS);
removeAttribute(renamedTableEntity, ATTRIBUTE_STORAGEDESC);
// set previous name as the alias // set previous name as the alias
renamedTableEntity.getEntity().setAttribute(ATTRIBUTE_ALIASES, Collections.singletonList(oldTable.getTableName())); renamedTableEntity.getEntity().setAttribute(ATTRIBUTE_ALIASES, Collections.singletonList(oldTable.getTableName()));
renamedTableEntity.getEntity().setRelationshipAttributes(null);
String oldTableQualifiedName = (String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME); String oldTableQualifiedName = (String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
AtlasObjectId oldTableId = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableQualifiedName); AtlasObjectId oldTableId = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableQualifiedName);
...@@ -179,35 +175,17 @@ public class AlterTableRename extends BaseHiveEvent { ...@@ -179,35 +175,17 @@ public class AlterTableRename extends BaseHiveEvent {
AtlasObjectId oldSdId = new AtlasObjectId(oldSd.getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldSd.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); AtlasObjectId oldSdId = new AtlasObjectId(oldSd.getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldSd.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
newSd.removeAttribute(ATTRIBUTE_TABLE); newSd.removeAttribute(ATTRIBUTE_TABLE);
newSd.setRelationshipAttributes(null);
notifications.add(new EntityPartialUpdateRequestV2(getUserName(), oldSdId, new AtlasEntityWithExtInfo(newSd))); notifications.add(new EntityPartialUpdateRequestV2(getUserName(), oldSdId, new AtlasEntityWithExtInfo(newSd)));
} }
} }
private void removeAttribute(AtlasEntityWithExtInfo entity, String attributeName) {
Object attributeValue = entity.getEntity().getAttribute(attributeName);
entity.getEntity().getAttributes().remove(attributeName);
if (attributeValue instanceof AtlasObjectId) {
AtlasObjectId objectId = (AtlasObjectId) attributeValue;
entity.removeReferredEntity(objectId.getGuid());
} else if (attributeValue instanceof Collection) {
for (Object item : (Collection) attributeValue)
if (item instanceof AtlasObjectId) {
AtlasObjectId objectId = (AtlasObjectId) item;
entity.removeReferredEntity(objectId.getGuid());
}
}
}
private AtlasEntity getStorageDescEntity(AtlasEntityWithExtInfo tableEntity) { private AtlasEntity getStorageDescEntity(AtlasEntityWithExtInfo tableEntity) {
AtlasEntity ret = null; AtlasEntity ret = null;
if (tableEntity != null && tableEntity.getEntity() != null) { if (tableEntity != null && tableEntity.getEntity() != null) {
Object attrSdId = tableEntity.getEntity().getAttribute(ATTRIBUTE_STORAGEDESC); Object attrSdId = tableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_STORAGEDESC);
if (attrSdId instanceof AtlasObjectId) { if (attrSdId instanceof AtlasObjectId) {
ret = tableEntity.getReferredEntity(((AtlasObjectId) attrSdId).getGuid()); ret = tableEntity.getReferredEntity(((AtlasObjectId) attrSdId).getGuid());
......
...@@ -25,6 +25,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; ...@@ -25,6 +25,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
...@@ -158,6 +159,20 @@ public abstract class BaseHiveEvent { ...@@ -158,6 +159,20 @@ public abstract class BaseHiveEvent {
public static final String HDFS_PATH_PREFIX = "hdfs://"; public static final String HDFS_PATH_PREFIX = "hdfs://";
public static final String EMPTY_ATTRIBUTE_VALUE = ""; public static final String EMPTY_ATTRIBUTE_VALUE = "";
public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs";
public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs";
public static final String RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE = "hive_process_column_lineage";
public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db";
public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS = "hive_table_partitionkeys";
public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = "hive_table_columns";
public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = "hive_table_storagedesc";
public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs";
public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = "hive_process_process_executions";
public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries";
public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries";
public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE = "hbase_table_namespace";
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>(); public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
...@@ -190,6 +205,28 @@ public abstract class BaseHiveEvent { ...@@ -190,6 +205,28 @@ public abstract class BaseHiveEvent {
return table.getTTable() != null ? (table.getOwner()): ""; return table.getTTable() != null ? (table.getOwner()): "";
} }
public static AtlasRelatedObjectId getObjectIdWithRelationshipType(AtlasEntity entity, String relationShipType) {
AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(getObjectId(entity), relationShipType);
return atlasRelatedObjectId;
}
public static List<AtlasRelatedObjectId> getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String relationshipType) {
final List<AtlasRelatedObjectId> ret;
if (CollectionUtils.isNotEmpty(entities)) {
ret = new ArrayList<>(entities.size());
for (AtlasEntity entity : entities) {
ret.add(getObjectIdWithRelationshipType(entity, relationshipType));
}
} else {
ret = Collections.emptyList();
}
return ret;
}
public static AtlasObjectId getObjectId(AtlasEntity entity) { public static AtlasObjectId getObjectId(AtlasEntity entity) {
String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
...@@ -390,7 +427,9 @@ public abstract class BaseHiveEvent { ...@@ -390,7 +427,9 @@ public abstract class BaseHiveEvent {
long createTime = getTableCreateTime(table); long createTime = getTableCreateTime(table);
long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime; long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
ret.setAttribute(ATTRIBUTE_DB, dbId); AtlasRelatedObjectId dbRelatedObject = new AtlasRelatedObjectId(dbId, RELATIONSHIP_HIVE_TABLE_DB);
ret.setRelationshipAttribute(ATTRIBUTE_DB, dbRelatedObject );
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase()); ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase());
ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner()); ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
...@@ -417,8 +456,10 @@ public abstract class BaseHiveEvent { ...@@ -417,8 +456,10 @@ public abstract class BaseHiveEvent {
} else { } else {
AtlasObjectId tableId = getObjectId(ret); AtlasObjectId tableId = getObjectId(ret);
AtlasEntity sd = getStorageDescEntity(tableId, table); AtlasEntity sd = getStorageDescEntity(tableId, table);
List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys()); List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys(), RELATIONSHIP_HIVE_TABLE_PART_KEYS);
List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols()); List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols(), RELATIONSHIP_HIVE_TABLE_COLUMNS);
if (entityExtInfo != null) { if (entityExtInfo != null) {
entityExtInfo.addReferredEntity(sd); entityExtInfo.addReferredEntity(sd);
...@@ -436,9 +477,10 @@ public abstract class BaseHiveEvent { ...@@ -436,9 +477,10 @@ public abstract class BaseHiveEvent {
} }
} }
ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys)); ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getObjectIdWithRelationshipType(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns)); ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partitionKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS));
ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
} }
context.putEntity(tblQualifiedName, ret); context.putEntity(tblQualifiedName, ret);
...@@ -466,7 +508,9 @@ public abstract class BaseHiveEvent { ...@@ -466,7 +508,9 @@ public abstract class BaseHiveEvent {
StorageDescriptor sd = table.getSd(); StorageDescriptor sd = table.getSd();
ret.setAttribute(ATTRIBUTE_TABLE, tableId); AtlasRelatedObjectId tableRelatedObject = new AtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC);
ret.setRelationshipAttribute(ATTRIBUTE_TABLE, tableRelatedObject);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
ret.setAttribute(ATTRIBUTE_PARAMETERS, sd.getParameters()); ret.setAttribute(ATTRIBUTE_PARAMETERS, sd.getParameters());
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(sd.getLocation())); ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(sd.getLocation()));
...@@ -512,7 +556,7 @@ public abstract class BaseHiveEvent { ...@@ -512,7 +556,7 @@ public abstract class BaseHiveEvent {
return ret; return ret;
} }
protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table table, List<FieldSchema> fieldSchemas) { protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table table, List<FieldSchema> fieldSchemas, String relationshipType) {
List<AtlasEntity> ret = new ArrayList<>(); List<AtlasEntity> ret = new ArrayList<>();
boolean isKnownTable = tableId.getGuid() == null; boolean isKnownTable = tableId.getGuid() == null;
int columnPosition = 0; int columnPosition = 0;
...@@ -531,8 +575,8 @@ public abstract class BaseHiveEvent { ...@@ -531,8 +575,8 @@ public abstract class BaseHiveEvent {
if (isKnownTable) { if (isKnownTable) {
column.setGuid(null); column.setGuid(null);
} }
AtlasRelatedObjectId relatedObjectId = new AtlasRelatedObjectId(tableId, relationshipType);
column.setAttribute(ATTRIBUTE_TABLE, tableId); column.setRelationshipAttribute(ATTRIBUTE_TABLE, (relatedObjectId));
column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, colQualifiedName); column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, colQualifiedName);
column.setAttribute(ATTRIBUTE_NAME, fieldSchema.getName()); column.setAttribute(ATTRIBUTE_NAME, fieldSchema.getName());
column.setAttribute(ATTRIBUTE_OWNER, table.getOwner()); column.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
...@@ -580,7 +624,7 @@ public abstract class BaseHiveEvent { ...@@ -580,7 +624,7 @@ public abstract class BaseHiveEvent {
ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
ret.setAttribute(ATTRIBUTE_BUCKET, getObjectId(bucketEntity)); ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, getObjectIdWithRelationshipType(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
...@@ -628,8 +672,8 @@ public abstract class BaseHiveEvent { ...@@ -628,8 +672,8 @@ public abstract class BaseHiveEvent {
} }
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs)); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs));
ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs)); ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs)); ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, getObjectIdsWithRelationshipType(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
ret.setAttribute(ATTRIBUTE_NAME, queryStr); ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName()); ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
...@@ -665,8 +709,9 @@ public abstract class BaseHiveEvent { ...@@ -665,8 +709,9 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr); ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId()); ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId());
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported"); ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName()); ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName()); //
ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess)); AtlasRelatedObjectId hiveProcessRelationObjectId = AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess, RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE);
ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, hiveProcessRelationObjectId);
return ret; return ret;
} }
...@@ -681,11 +726,16 @@ public abstract class BaseHiveEvent { ...@@ -681,11 +726,16 @@ public abstract class BaseHiveEvent {
if (excludeEntityGuid) { if (excludeEntityGuid) {
objId.setGuid(null); objId.setGuid(null);
} }
AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(objId);
if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_DB)) { if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_DB)) {
hiveDDL = new AtlasEntity(HIVE_DB_DDL, ATTRIBUTE_DB, objId); hiveDDL = new AtlasEntity(HIVE_DB_DDL);
objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_DB_DDL_QUERIES);
hiveDDL.setRelationshipAttribute(ATTRIBUTE_DB, objIdRelatedObject);
} else if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_TABLE)) { } else if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_TABLE)) {
hiveDDL = new AtlasEntity(HIVE_TABLE_DDL, ATTRIBUTE_TABLE, objId); hiveDDL = new AtlasEntity(HIVE_TABLE_DDL);
objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_TABLE_DDL_QUERIES);
hiveDDL.setRelationshipAttribute( ATTRIBUTE_TABLE, objIdRelatedObject);
} }
if (hiveDDL != null) { if (hiveDDL != null) {
...@@ -945,7 +995,10 @@ public abstract class BaseHiveEvent { ...@@ -945,7 +995,10 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName); ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName);
ret.setAttribute(ATTRIBUTE_URI, hbaseTableName); ret.setAttribute(ATTRIBUTE_URI, hbaseTableName);
ret.setAttribute(ATTRIBUTE_NAMESPACE, getObjectId(nsEntity));
AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE);
ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE, objIdRelatedObject);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName)); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName));
entities.addReferredEntity(nsEntity); entities.addReferredEntity(nsEntity);
......
...@@ -207,9 +207,9 @@ public class CreateHiveProcess extends BaseHiveEvent { ...@@ -207,9 +207,9 @@ public class CreateHiveProcess extends BaseHiveEvent {
columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns)); columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS));
columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectId(outputColumn))); columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectIdWithRelationshipType(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
columnLineageProcess.setAttribute(ATTRIBUTE_QUERY, getObjectId(hiveProcess)); columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, getObjectIdWithRelationshipType(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType()); columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType());
columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr()); columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr());
......
...@@ -83,6 +83,12 @@ public class AtlasRelatedObjectId extends AtlasObjectId implements Serializable ...@@ -83,6 +83,12 @@ public class AtlasRelatedObjectId extends AtlasObjectId implements Serializable
super(other); super(other);
} }
public AtlasRelatedObjectId(AtlasObjectId objId, String relationshipType) {
this(objId);
setRelationshipType(relationshipType);
}
public AtlasRelatedObjectId(Map objIdMap) { public AtlasRelatedObjectId(Map objIdMap) {
super(objIdMap); super(objIdMap);
......
...@@ -414,6 +414,10 @@ public class AtlasTypeUtil { ...@@ -414,6 +414,10 @@ public class AtlasTypeUtil {
return new AtlasRelatedObjectId(getAtlasObjectId(entity)); return new AtlasRelatedObjectId(getAtlasObjectId(entity));
} }
public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, String relationshipType){
return new AtlasRelatedObjectId(getAtlasObjectId(entity), relationshipType);
}
public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) { public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) {
return new AtlasRelatedObjectId(getAtlasObjectId(entity, typeRegistry)); return new AtlasRelatedObjectId(getAtlasObjectId(entity, typeRegistry));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment