Commit be9df1c3 by Le Ma Committed by Sarath Subramanian

ATLAS-3229 DDL should not capture DML queries

parent e21f569f
...@@ -109,7 +109,8 @@ public class CreateHiveProcess extends BaseHiveEvent { ...@@ -109,7 +109,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
outputs.add(entity); outputs.add(entity);
} }
if (entity != null && !context.isMetastoreHook()) { if (isDdlOperation(entity)) {
AtlasEntity ddlEntity = createHiveDDLEntity(entity); AtlasEntity ddlEntity = createHiveDDLEntity(entity);
if (ddlEntity != null) { if (ddlEntity != null) {
...@@ -277,4 +278,11 @@ public class CreateHiveProcess extends BaseHiveEvent { ...@@ -277,4 +278,11 @@ public class CreateHiveProcess extends BaseHiveEvent {
return ret; return ret;
} }
private boolean isDdlOperation(AtlasEntity entity) {
return entity != null && !context.isMetastoreHook()
&& (context.getHiveOperation().equals(HiveOperation.CREATETABLE_AS_SELECT)
|| context.getHiveOperation().equals(HiveOperation.CREATEVIEW)
|| context.getHiveOperation().equals(HiveOperation.ALTERVIEW_AS));
}
} }
...@@ -534,8 +534,16 @@ public class HiveHookIT extends HiveITBase { ...@@ -534,8 +534,16 @@ public class HiveHookIT extends HiveITBase {
String loadFile = file("load"); String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName; String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
runCommand(query); runCommand(query);
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 1);
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE))); assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
} }
...@@ -545,8 +553,16 @@ public class HiveHookIT extends HiveITBase { ...@@ -545,8 +553,16 @@ public class HiveHookIT extends HiveITBase {
String loadFile = file("load"); String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
runCommand(query); runCommand(query);
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 1);
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE))); assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
} }
...@@ -643,7 +659,12 @@ public class HiveHookIT extends HiveITBase { ...@@ -643,7 +659,12 @@ public class HiveHookIT extends HiveITBase {
addAll(inputs); addAll(inputs);
}}; }};
assertTableIsRegistered(DEFAULT_DB, insertTableName); String tblId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 1);
AtlasEntity processEntity1 = validateProcess(event, expectedInputs, outputs); AtlasEntity processEntity1 = validateProcess(event, expectedInputs, outputs);
...@@ -961,7 +982,13 @@ public class HiveHookIT extends HiveITBase { ...@@ -961,7 +982,13 @@ public class HiveHookIT extends HiveITBase {
Assert.assertEquals(process.getGuid(), hiveProcess.getGuid()); Assert.assertEquals(process.getGuid(), hiveProcess.getGuid());
Assert.assertEquals(numberOfProcessExecutions(hiveProcess), 1); Assert.assertEquals(numberOfProcessExecutions(hiveProcess), 1);
assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
String tblId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 1);
//TODO -Add update test case //TODO -Add update test case
} }
...@@ -970,13 +997,19 @@ public class HiveHookIT extends HiveITBase { ...@@ -970,13 +997,19 @@ public class HiveHookIT extends HiveITBase {
public void testExportImportUnPartitionedTable() throws Exception { public void testExportImportUnPartitionedTable() throws Exception {
String tableName = createTable(false); String tableName = createTable(false);
assertTableIsRegistered(DEFAULT_DB, tableName); String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
String filename = "pfile://" + mkdir("exportUnPartitioned"); String filename = "pfile://" + mkdir("exportUnPartitioned");
String query = "export table " + tableName + " to \"" + filename + "\""; String query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query); runCommand(query);
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 1);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
...@@ -993,12 +1026,18 @@ public class HiveHookIT extends HiveITBase { ...@@ -993,12 +1026,18 @@ public class HiveHookIT extends HiveITBase {
//Import //Import
String importTableName = createTable(false); String importTableName = createTable(false);
assertTableIsRegistered(DEFAULT_DB, importTableName); String importTblId = assertTableIsRegistered(DEFAULT_DB, importTableName);
query = "import table " + importTableName + " from '" + filename + "'"; query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query); runCommand(query);
AtlasEntity importTblEntity = atlasClientV2.getEntityByGuid(importTblId).getEntity();
List importTblddlQueries = (List) importTblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(importTblddlQueries);
Assert.assertEquals(importTblddlQueries.size(), 1);
outputs = getOutputs(importTableName, Entity.Type.TABLE); outputs = getOutputs(importTableName, Entity.Type.TABLE);
HiveEventContext event2 = constructEvent(query, HiveOperation.IMPORT, HiveEventContext event2 = constructEvent(query, HiveOperation.IMPORT,
...@@ -1018,6 +1057,12 @@ public class HiveHookIT extends HiveITBase { ...@@ -1018,6 +1057,12 @@ public class HiveHookIT extends HiveITBase {
runCommand(query); runCommand(query);
AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries2 = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries2);
Assert.assertEquals(ddlQueries2.size(), 1);
inputs = getInputs(tableName, Entity.Type.TABLE); inputs = getInputs(tableName, Entity.Type.TABLE);
outputs = getOutputs(filename, Entity.Type.DFS_DIR); outputs = getOutputs(filename, Entity.Type.DFS_DIR);
...@@ -1039,6 +1084,12 @@ public class HiveHookIT extends HiveITBase { ...@@ -1039,6 +1084,12 @@ public class HiveHookIT extends HiveITBase {
runCommand(query); runCommand(query);
AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(importTblId).getEntity();
List ddlQueries3 = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries3);
Assert.assertEquals(ddlQueries3.size(), 1);
outputs = getOutputs(importTableName, Entity.Type.TABLE); outputs = getOutputs(importTableName, Entity.Type.TABLE);
HiveEventContext event4 = constructEvent(query, HiveOperation.IMPORT, getInputs(filename, HiveEventContext event4 = constructEvent(query, HiveOperation.IMPORT, getInputs(filename,
...@@ -1062,7 +1113,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1062,7 +1113,7 @@ public class HiveHookIT extends HiveITBase {
boolean isPartitionedTable = true; boolean isPartitionedTable = true;
String tableName = createTable(isPartitionedTable); String tableName = createTable(isPartitionedTable);
assertTableIsRegistered(DEFAULT_DB, tableName); String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
//Add a partition //Add a partition
String partFile = "pfile://" + mkdir("partition"); String partFile = "pfile://" + mkdir("partition");
...@@ -1070,12 +1121,24 @@ public class HiveHookIT extends HiveITBase { ...@@ -1070,12 +1121,24 @@ public class HiveHookIT extends HiveITBase {
runCommand(query); runCommand(query);
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 1);
String filename = "pfile://" + mkdir("export"); String filename = "pfile://" + mkdir("export");
query = "export table " + tableName + " to \"" + filename + "\""; query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query); runCommand(query);
AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries2 = (List) tblEntity2.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries2);
Assert.assertEquals(ddlQueries2.size(), 1);
Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE); Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); //Note that export has only partition as input in this case Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); //Note that export has only partition as input in this case
...@@ -1095,12 +1158,18 @@ public class HiveHookIT extends HiveITBase { ...@@ -1095,12 +1158,18 @@ public class HiveHookIT extends HiveITBase {
//Import //Import
String importTableName = createTable(true); String importTableName = createTable(true);
assertTableIsRegistered(DEFAULT_DB, tableName); String tblId2 = assertTableIsRegistered(DEFAULT_DB, tableName);
query = "import table " + importTableName + " from '" + filename + "'"; query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query); runCommand(query);
AtlasEntity tblEntity3 = atlasClientV2.getEntityByGuid(tblId2).getEntity();
List ddlQueries3 = (List) tblEntity3.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries3);
Assert.assertEquals(ddlQueries3.size(), 1);
Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR); Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR);
Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE); Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE);
Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
...@@ -1715,8 +1784,14 @@ public class HiveHookIT extends HiveITBase { ...@@ -1715,8 +1784,14 @@ public class HiveHookIT extends HiveITBase {
runCommand(query); runCommand(query);
assertTableIsRegistered(DEFAULT_DB, newName);
assertTableIsNotRegistered(DEFAULT_DB, viewName); assertTableIsNotRegistered(DEFAULT_DB, viewName);
String viewId = assertTableIsRegistered(DEFAULT_DB, newName);
AtlasEntity viewEntity = atlasClientV2.getEntityByGuid(viewId).getEntity();
List ddlQueries = (List) viewEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 2);
} }
@Test @Test
...@@ -1728,7 +1803,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1728,7 +1803,7 @@ public class HiveHookIT extends HiveITBase {
runCommandWithDelay(query, 5000); runCommandWithDelay(query, 5000);
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { String tblId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override @Override
public void assertOnEntity(AtlasEntity tableRef) throws Exception { public void assertOnEntity(AtlasEntity tableRef) throws Exception {
AtlasObjectId sd = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC)); AtlasObjectId sd = toAtlasObjectId(tableRef.getAttribute(ATTRIBUTE_STORAGEDESC));
...@@ -1737,6 +1812,12 @@ public class HiveHookIT extends HiveITBase { ...@@ -1737,6 +1812,12 @@ public class HiveHookIT extends HiveITBase {
} }
}); });
AtlasEntity tblEntity = atlasClientV2.getEntityByGuid(tblId).getEntity();
List ddlQueries = (List) tblEntity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
Assert.assertNotNull(ddlQueries);
Assert.assertEquals(ddlQueries.size(), 2);
String processQualifiedName = getTableProcessQualifiedName(DEFAULT_DB, tableName); String processQualifiedName = getTableProcessQualifiedName(DEFAULT_DB, tableName);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQualifiedName, null); String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQualifiedName, null);
AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity(); AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment