Commit 549dee6b by Mandar Ambawane Committed by Madhan Neethiraj

ATLAS-3325: update hive-bridge to use relationship attributes

parent 49aa29d5
...@@ -363,8 +363,8 @@ public class HiveMetaStoreBridge { ...@@ -363,8 +363,8 @@ public class HiveMetaStoreBridge {
processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName); processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName);
processInst.setAttribute(ATTRIBUTE_NAME, query); processInst.setAttribute(ATTRIBUTE_NAME, query);
processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
processInst.setAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(pathInst))); processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(getAtlasRelatedObjectId(pathInst, RELATIONSHIP_DATASET_PROCESS_INPUTS)));
processInst.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(tableInst))); processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getAtlasRelatedObjectId(tableInst, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
processInst.setAttribute(ATTRIBUTE_USER_NAME, table.getOwner()); processInst.setAttribute(ATTRIBUTE_USER_NAME, table.getOwner());
processInst.setAttribute(ATTRIBUTE_START_TIME, now); processInst.setAttribute(ATTRIBUTE_START_TIME, now);
processInst.setAttribute(ATTRIBUTE_END_TIME, now); processInst.setAttribute(ATTRIBUTE_END_TIME, now);
...@@ -590,7 +590,7 @@ public class HiveMetaStoreBridge { ...@@ -590,7 +590,7 @@ public class HiveMetaStoreBridge {
long createTime = BaseHiveEvent.getTableCreateTime(hiveTable); long createTime = BaseHiveEvent.getTableCreateTime(hiveTable);
long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime; long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime;
tableEntity.setAttribute(ATTRIBUTE_DB, BaseHiveEvent.getObjectId(database)); tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB));
tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName); tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase()); tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase());
tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner()); tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner());
...@@ -611,13 +611,13 @@ public class HiveMetaStoreBridge { ...@@ -611,13 +611,13 @@ public class HiveMetaStoreBridge {
tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText()); tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText());
} }
AtlasEntity sdEntity = toStroageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), BaseHiveEvent.getObjectId(tableEntity)); AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), BaseHiveEvent.getObjectId(tableEntity));
List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), tableEntity); List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), tableEntity, RELATIONSHIP_HIVE_TABLE_PART_KEYS);
List<AtlasEntity> columns = toColumns(hiveTable.getCols(), tableEntity); List<AtlasEntity> columns = toColumns(hiveTable.getCols(), tableEntity, RELATIONSHIP_HIVE_TABLE_COLUMNS);
tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, BaseHiveEvent.getObjectId(sdEntity)); tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
tableEntity.setAttribute(ATTRIBUTE_PARTITION_KEYS, BaseHiveEvent.getObjectIds(partKeys)); tableEntity.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS));
tableEntity.setAttribute(ATTRIBUTE_COLUMNS, BaseHiveEvent.getObjectIds(columns)); tableEntity.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
table.addReferredEntity(database); table.addReferredEntity(database);
table.addReferredEntity(sdEntity); table.addReferredEntity(sdEntity);
...@@ -639,10 +639,10 @@ public class HiveMetaStoreBridge { ...@@ -639,10 +639,10 @@ public class HiveMetaStoreBridge {
return table; return table;
} }
private AtlasEntity toStroageDescEntity(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) throws AtlasHookException { private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) throws AtlasHookException {
AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName()); AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
ret.setAttribute(ATTRIBUTE_TABLE, tableId); ret.setRelationshipAttribute(ATTRIBUTE_TABLE, getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters()); ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation())); ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
...@@ -689,7 +689,7 @@ public class HiveMetaStoreBridge { ...@@ -689,7 +689,7 @@ public class HiveMetaStoreBridge {
return ret; return ret;
} }
private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table) throws AtlasHookException { private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table, String relationshipType) throws AtlasHookException {
List<AtlasEntity> ret = new ArrayList<>(); List<AtlasEntity> ret = new ArrayList<>();
int columnPosition = 0; int columnPosition = 0;
...@@ -698,7 +698,7 @@ public class HiveMetaStoreBridge { ...@@ -698,7 +698,7 @@ public class HiveMetaStoreBridge {
AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName()); AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName());
column.setAttribute(ATTRIBUTE_TABLE, BaseHiveEvent.getObjectId(table)); column.setRelationshipAttribute(ATTRIBUTE_TABLE, getAtlasRelatedObjectId(table, relationshipType));
column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName())); column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName()));
column.setAttribute(ATTRIBUTE_NAME, fs.getName()); column.setAttribute(ATTRIBUTE_NAME, fs.getName());
column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER)); column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER));
......
...@@ -206,19 +206,21 @@ public abstract class BaseHiveEvent { ...@@ -206,19 +206,21 @@ public abstract class BaseHiveEvent {
} }
public static AtlasRelatedObjectId getObjectIdWithRelationshipType(AtlasEntity entity, String relationShipType) { public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasEntity entity, String relationshipType) {
AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(getObjectId(entity), relationShipType); return getAtlasRelatedObjectId(getObjectId(entity), relationshipType);
return atlasRelatedObjectId;
} }
public static AtlasRelatedObjectId getAtlasRelatedObjectId(AtlasObjectId objectId, String relationShipType) {
AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(objectId, relationShipType);
return atlasRelatedObjectId;
}
public static List<AtlasRelatedObjectId> getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String relationshipType) { public static List<AtlasRelatedObjectId> getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String relationshipType) {
final List<AtlasRelatedObjectId> ret; final List<AtlasRelatedObjectId> ret;
if (CollectionUtils.isNotEmpty(entities)) { if (CollectionUtils.isNotEmpty(entities)) {
ret = new ArrayList<>(entities.size()); ret = new ArrayList<>(entities.size());
for (AtlasEntity entity : entities) { for (AtlasEntity entity : entities) {
ret.add(getObjectIdWithRelationshipType(entity, relationshipType)); ret.add(getAtlasRelatedObjectId(entity, relationshipType));
} }
} else { } else {
ret = Collections.emptyList(); ret = Collections.emptyList();
...@@ -478,7 +480,7 @@ public abstract class BaseHiveEvent { ...@@ -478,7 +480,7 @@ public abstract class BaseHiveEvent {
} }
ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getObjectIdWithRelationshipType(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC)); ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getAtlasRelatedObjectId(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partitionKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS)); ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partitionKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS));
ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS)); ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
} }
...@@ -625,7 +627,7 @@ public abstract class BaseHiveEvent { ...@@ -625,7 +627,7 @@ public abstract class BaseHiveEvent {
ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, getObjectIdWithRelationshipType(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
......
...@@ -208,8 +208,8 @@ public class CreateHiveProcess extends BaseHiveEvent { ...@@ -208,8 +208,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS)); columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS));
columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectIdWithRelationshipType(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS))); columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getAtlasRelatedObjectId(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, getObjectIdWithRelationshipType(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE)); columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, getAtlasRelatedObjectId(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType()); columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType());
columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr()); columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment