Commit d5437734 by Le Ma Committed by Madhan Neethiraj

ATLAS-3197: capture DDL statements for hive_table and hive_db operations

parent 55ef909c
......@@ -147,6 +147,13 @@ public class AlterTableRename extends BaseHiveEvent {
// update qualifiedName and other attributes (like params - which include lastModifiedTime, lastModifiedBy) of the table
ret.add(new EntityPartialUpdateRequestV2(getUserName(), oldTableId, renamedTableEntity));
// partial update relationship attribute ddl
AtlasEntity ddlEntity = createHiveDDLEntity(renamedTableEntity.getEntity(), true);
if (ddlEntity != null) {
ret.add(new HookNotification.EntityCreateRequestV2(getUserName(), new AtlasEntitiesWithExtInfo(ddlEntity)));
}
context.removeFromKnownTable(oldTableQualifiedName);
}
......
......@@ -77,6 +77,8 @@ public abstract class BaseHiveEvent {
public static final String HIVE_TYPE_SERDE = "hive_serde";
public static final String HIVE_TYPE_ORDER = "hive_order";
public static final String HIVE_TYPE_PROCESS_EXECUTION = "hive_process_execution";
public static final String HIVE_DB_DDL = "hive_db_ddl";
public static final String HIVE_TABLE_DDL = "hive_table_ddl";
public static final String HDFS_TYPE_PATH = "hdfs_path";
public static final String HBASE_TYPE_TABLE = "hbase_table";
public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace";
......@@ -144,6 +146,8 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix";
public static final String ATTRIBUTE_BUCKET = "bucket";
public static final String ATTRIBUTE_HOSTNAME = "hostName";
public static final String ATTRIBUTE_EXEC_TIME = "execTime";
public static final String ATTRIBUTE_DDL_QUERIES = "ddlQueries";
public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
......@@ -663,6 +667,35 @@ public abstract class BaseHiveEvent {
return ret;
}
protected AtlasEntity createHiveDDLEntity(AtlasEntity dbOrTable) {
return createHiveDDLEntity(dbOrTable, false);
}
protected AtlasEntity createHiveDDLEntity(AtlasEntity dbOrTable, boolean excludeEntityGuid) {
AtlasObjectId objId = BaseHiveEvent.getObjectId(dbOrTable);
AtlasEntity hiveDDL = null;
if (excludeEntityGuid) {
objId.setGuid(null);
}
if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_DB)) {
hiveDDL = new AtlasEntity(HIVE_DB_DDL, ATTRIBUTE_DB, objId);
} else if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_TABLE)) {
hiveDDL = new AtlasEntity(HIVE_TABLE_DDL, ATTRIBUTE_TABLE, objId);
}
if (hiveDDL != null) {
hiveDDL.setAttribute(ATTRIBUTE_EXEC_TIME, getQueryStartTime());
hiveDDL.setAttribute(ATTRIBUTE_QUERY_TEXT, getQueryString());
hiveDDL.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
hiveDDL.setAttribute(ATTRIBUTE_NAME, getQueryString() + QNAME_SEP_PROCESS + getQueryStartTime().toString());
hiveDDL.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveDDL.getAttribute(ATTRIBUTE_NAME));
}
return hiveDDL;
}
protected String getClusterName() {
return context.getClusterName();
}
......
......@@ -84,9 +84,14 @@ public class CreateDatabase extends BaseHiveEvent {
}
if (db != null) {
AtlasEntity dbEntity = toDbEntity(db);
AtlasEntity dbEntity = toDbEntity(db);
AtlasEntity dbDDLEntity = createHiveDDLEntity(dbEntity);
ret.addEntity(dbEntity);
if (dbDDLEntity != null) {
ret.addEntity(dbDDLEntity);
}
} else {
LOG.error("CreateDatabase.getEntities(): failed to retrieve db");
}
......
......@@ -149,6 +149,12 @@ public class CreateTable extends BaseHiveEvent {
}
}
}
AtlasEntity tableDDLEntity = createHiveDDLEntity(tblEntity);
if (tableDDLEntity != null) {
ret.addEntity(tableDDLEntity);
}
}
}
......
......@@ -92,9 +92,13 @@ public class HiveHookIT extends HiveITBase {
runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')");
String dbId = assertDatabaseIsRegistered(dbName);
AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity();
Map params = (Map) dbEntity.getAttribute(ATTRIBUTE_PARAMETERS);
String dbId = assertDatabaseIsRegistered(dbName);
AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity();
Map params = (Map) dbEntity.getAttribute(ATTRIBUTE_PARAMETERS);
List ddlQueries = (List) dbEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(),1);
Assert.assertNotNull(params);
Assert.assertEquals(params.size(), 2);
......@@ -124,10 +128,14 @@ public class HiveHookIT extends HiveITBase {
String tableId = assertTableIsRegistered(dbName, tableName);
String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName)); //there is only one instance of column registered
AtlasEntity colEntity = atlasClientV2.getEntityByGuid(colId).getEntity();
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(colId).getEntity();
Assert.assertEquals(colEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), String.format("%s.%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME));
Assert.assertNotNull(colEntity.getAttribute(ATTRIBUTE_TABLE));
Assert.assertNotNull(tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES));
Assert.assertEquals(((List)tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES)).size(), 1);
AtlasObjectId tblObjId = toAtlasObjectId(colEntity.getAttribute(ATTRIBUTE_TABLE));
Assert.assertEquals(tblObjId.getGuid(), tableId);
......@@ -1204,7 +1212,7 @@ public class HiveHookIT extends HiveITBase {
assertTrait(partColumnGuid, partColTraitDetails);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(newDBName, newTableName, new AssertPredicate() {
String renamedTableId = assertTableIsRegistered(newDBName, newTableName, new AssertPredicate() {
@Override
public void assertOnEntity(final AtlasEntity entity) throws Exception {
AtlasObjectId sd = toAtlasObjectId(entity.getAttribute(ATTRIBUTE_STORAGEDESC));
......@@ -1212,6 +1220,13 @@ public class HiveHookIT extends HiveITBase {
assertNotNull(sd);
}
});
AtlasEntity renamedTableEntity = atlasClientV2.getEntityByGuid(renamedTableId).getEntity();
List ddlQueries = (List) renamedTableEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 2);
}
private List<AtlasEntity> getColumns(String dbName, String tableName) throws Exception {
......@@ -1266,6 +1281,14 @@ public class HiveHookIT extends HiveITBase {
List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 3);
String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 2);
}
//ATLAS-1321: Disable problematic tests. Need to revisit and fix them later
......@@ -1284,6 +1307,13 @@ public class HiveHookIT extends HiveITBase {
assertEquals(columns.size(), 1);
assertEquals(columns.get(0).getAttribute(NAME), "name");
String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 2);
}
@Test
......@@ -1304,6 +1334,13 @@ public class HiveHookIT extends HiveITBase {
Assert.assertEquals(columns.size(), 2);
String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 2);
//Change column type
oldColName = "name1";
newColName = "name2";
......@@ -1329,6 +1366,12 @@ public class HiveHookIT extends HiveITBase {
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries2 = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries2);
Assert.assertEquals(ddlQueries2.size(), 3);
//Change name and add comment
oldColName = "name2";
newColName = "name3";
......@@ -1373,7 +1416,7 @@ public class HiveHookIT extends HiveITBase {
String finalNewColName = newColName;
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
String tblId3 = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) throws Exception {
List<AtlasObjectId> columns = toAtlasObjectIdList(entity.getAttribute(ATTRIBUTE_COLUMNS));
......@@ -1383,6 +1426,12 @@ public class HiveHookIT extends HiveITBase {
}
);
AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(tblId3).getEntity();
List ddlQueries3 = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries3);
Assert.assertEquals(ddlQueries3.size(), 4);
//Change col position again
oldColName = "name4";
newColName = "name5";
......@@ -1403,7 +1452,7 @@ public class HiveHookIT extends HiveITBase {
//Check col position
String finalNewColName2 = newColName;
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
String tblId4 = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(AtlasEntity entity) throws Exception {
List<AtlasObjectId> columns = toAtlasObjectIdList(entity.getAttribute(ATTRIBUTE_COLUMNS));
......@@ -1412,6 +1461,12 @@ public class HiveHookIT extends HiveITBase {
}
}
);
AtlasEntity tblEntity4 = atlasClientV2.getEntityByGuid(tblId4).getEntity();
List ddlQueries4 = (List) tblEntity4.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries4);
Assert.assertEquals(ddlQueries4.size(), 5);
}
/**
......
......@@ -531,6 +531,58 @@
"isUnique": false
}
]
},
{
"name": "hive_ddl",
"superTypes": [
"Referenceable"
],
"serviceType": "hive",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "queryText",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "execTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "userName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
}
]
},
{
"name": "hive_db_ddl",
"superTypes": [
"hive_ddl"
],
"serviceType": "hive",
"typeVersion": "1.0",
"attributeDefs": []
},
{
"name": "hive_table_ddl",
"superTypes": [
"hive_ddl"
],
"serviceType": "hive",
"typeVersion": "1.0",
"attributeDefs": []
}
],
"relationshipDefs": [
......@@ -659,6 +711,44 @@
"cardinality": "SINGLE"
},
"propagateTags": "NONE"
},
{
"name": "hive_table_ddl_queries",
"serviceType": "hive",
"typeVersion": "1.0",
"relationshipCategory": "COMPOSITION",
"endDef1": {
"type": "hive_table",
"name": "ddlQueries",
"isContainer": true,
"cardinality": "SET"
},
"endDef2": {
"type": "hive_table_ddl",
"name": "table",
"isContainer": false,
"cardinality": "SINGLE"
},
"propagateTags": "NONE"
},
{
"name": "hive_db_ddl_queries",
"serviceType": "hive",
"typeVersion": "1.0",
"relationshipCategory": "COMPOSITION",
"endDef1": {
"type": "hive_db",
"name": "ddlQueries",
"isContainer": true,
"cardinality": "SET"
},
"endDef2": {
"type": "hive_db_ddl",
"name": "db",
"isContainer": false,
"cardinality": "SINGLE"
},
"propagateTags": "NONE"
}
]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment