Commit f54cff2f by Sarath Subramanian

ATLAS-3168: Fix intermittent UT failure: HiveHookIT and ImpalaLineageToolIT

parent 6d71f14a
...@@ -224,7 +224,7 @@ public class HiveITBase { ...@@ -224,7 +224,7 @@ public class HiveITBase {
protected String assertEntityIsRegistered(final String typeName, final String property, final String value, protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
final HiveHookIT.AssertPredicate assertPredicate) throws Exception { final HiveHookIT.AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new HiveHookIT.Predicate() { waitFor(100000, new HiveHookIT.Predicate() {
@Override @Override
public void evaluate() throws Exception { public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value)); AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value));
...@@ -242,7 +242,7 @@ public class HiveITBase { ...@@ -242,7 +242,7 @@ public class HiveITBase {
protected String assertEntityIsRegisteredViaGuid(String guid, protected String assertEntityIsRegisteredViaGuid(String guid,
final HiveHookIT.AssertPredicate assertPredicate) throws Exception { final HiveHookIT.AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new HiveHookIT.Predicate() { waitFor(100000, new HiveHookIT.Predicate() {
@Override @Override
public void evaluate() throws Exception { public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByGuid(guid); AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByGuid(guid);
...@@ -373,7 +373,7 @@ public class HiveITBase { ...@@ -373,7 +373,7 @@ public class HiveITBase {
protected void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception { protected void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception {
// wait for sufficient time before checking if entity is not available. // wait for sufficient time before checking if entity is not available.
long waitTime = 2000; long waitTime = 10000;
LOG.debug("Waiting for {} msecs, before asserting entity is not registered.", waitTime); LOG.debug("Waiting for {} msecs, before asserting entity is not registered.", waitTime);
Thread.sleep(waitTime); Thread.sleep(waitTime);
......
...@@ -76,7 +76,7 @@ public class HiveMetastoreBridgeIT extends HiveITBase { ...@@ -76,7 +76,7 @@ public class HiveMetastoreBridgeIT extends HiveITBase {
String tableName = tableName(); String tableName = tableName();
String pFile = createTestDFSPath("parentPath"); String pFile = createTestDFSPath("parentPath");
runCommand(driverWithoutContext, String.format("create EXTERNAL table %s(id string) location '%s'", tableName, pFile)); runCommandWithDelay(driverWithoutContext, String.format("create EXTERNAL table %s(id string) location '%s'", tableName, pFile), 3000);
String dbId = assertDatabaseIsRegistered(DEFAULT_DB); String dbId = assertDatabaseIsRegistered(DEFAULT_DB);
......
...@@ -105,10 +105,10 @@ public class HiveHookIT extends HiveITBase { ...@@ -105,10 +105,10 @@ public class HiveHookIT extends HiveITBase {
Assert.assertEquals(params.get("p1"), "v1"); Assert.assertEquals(params.get("p1"), "v1");
//There should be just one entity per dbname //There should be just one entity per dbname
runCommand("drop database " + dbName); runCommandWithDelay("drop database " + dbName, 3000);
assertDatabaseIsNotRegistered(dbName); assertDatabaseIsNotRegistered(dbName);
runCommand("create database " + dbName); runCommandWithDelay("create database " + dbName, 3000);
dbId = assertDatabaseIsRegistered(dbName); dbId = assertDatabaseIsRegistered(dbName);
//assert on qualified name //assert on qualified name
...@@ -361,7 +361,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -361,7 +361,7 @@ public class HiveHookIT extends HiveITBase {
String tableName = tableName(); String tableName = tableName();
String command = "create table " + tableName + "(id int, name string) row format delimited lines terminated by '\n' null defined as ''"; String command = "create table " + tableName + "(id int, name string) row format delimited lines terminated by '\n' null defined as ''";
runCommand(command); runCommandWithDelay(command, 3000);
assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, tableName);
} }
...@@ -689,7 +689,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -689,7 +689,7 @@ public class HiveHookIT extends HiveITBase {
); );
//Rerun same query. Should result in same process //Rerun same query. Should result in same process
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
AtlasEntity processEntity2 = validateProcess(event, expectedInputs, outputs); AtlasEntity processEntity2 = validateProcess(event, expectedInputs, outputs);
Assert.assertEquals(numberOfProcessExecutions(processEntity2), 2); Assert.assertEquals(numberOfProcessExecutions(processEntity2), 2);
...@@ -752,7 +752,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -752,7 +752,7 @@ public class HiveHookIT extends HiveITBase {
); );
//Rerun same query. Should result in same process //Rerun same query. Should result in same process
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
AtlasEntity processEntity2 = validateProcess(event, expectedInputs, outputs); AtlasEntity processEntity2 = validateProcess(event, expectedInputs, outputs);
AtlasEntity processExecutionEntity2 = validateProcessExecution(processEntity2, event); AtlasEntity processExecutionEntity2 = validateProcessExecution(processEntity2, event);
...@@ -826,7 +826,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -826,7 +826,7 @@ public class HiveHookIT extends HiveITBase {
validateInputTables(processEntity, inputs); validateInputTables(processEntity, inputs);
//Rerun same query with same HDFS path //Rerun same query with same HDFS path
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, tableName);
...@@ -846,7 +846,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -846,7 +846,7 @@ public class HiveHookIT extends HiveITBase {
query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName; query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
String tblId = assertTableIsRegistered(DEFAULT_DB, tableName); String tblId = assertTableIsRegistered(DEFAULT_DB, tableName);
...@@ -1216,7 +1216,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1216,7 +1216,7 @@ public class HiveHookIT extends HiveITBase {
//Import should update same process //Import should update same process
query = "import table " + importTableName + " from '" + filename + "'"; query = "import table " + importTableName + " from '" + filename + "'";
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR); Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR);
Set<ReadEntity> expectedImport2Inputs = new LinkedHashSet<ReadEntity>() {{ Set<ReadEntity> expectedImport2Inputs = new LinkedHashSet<ReadEntity>() {{
...@@ -1300,7 +1300,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1300,7 +1300,7 @@ public class HiveHookIT extends HiveITBase {
String newTableName = tableName(); String newTableName = tableName();
String query = String.format("alter table %s rename to %s", DEFAULT_DB + "." + tableName, newDBName + "." + newTableName); String query = String.format("alter table %s rename to %s", DEFAULT_DB + "." + tableName, newDBName + "." + newTableName);
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), NAME)); String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), NAME));
...@@ -1429,7 +1429,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1429,7 +1429,7 @@ public class HiveHookIT extends HiveITBase {
String tableName = createTable(); String tableName = createTable();
String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName); String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName);
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName)); assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName));
...@@ -1454,7 +1454,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1454,7 +1454,7 @@ public class HiveHookIT extends HiveITBase {
query = String.format("alter table %s change column %s %s %s", tableName, oldColName, newColName, newColType); query = String.format("alter table %s change column %s %s %s", tableName, oldColName, newColName, newColType);
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
columns = getColumns(DEFAULT_DB, tableName); columns = getColumns(DEFAULT_DB, tableName);
...@@ -1485,7 +1485,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1485,7 +1485,7 @@ public class HiveHookIT extends HiveITBase {
query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, newColName, newColType, comment); query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, newColName, newColType, comment);
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
columns = getColumns(DEFAULT_DB, tableName); columns = getColumns(DEFAULT_DB, tableName);
...@@ -1507,7 +1507,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1507,7 +1507,7 @@ public class HiveHookIT extends HiveITBase {
newColName = "name4"; newColName = "name4";
query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName, newColType); query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName, newColType);
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
columns = getColumns(DEFAULT_DB, tableName); columns = getColumns(DEFAULT_DB, tableName);
...@@ -1542,7 +1542,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1542,7 +1542,7 @@ public class HiveHookIT extends HiveITBase {
newColName = "name5"; newColName = "name5";
query = String.format("alter table %s change column %s %s %s after id", tableName, oldColName, newColName, newColType); query = String.format("alter table %s change column %s %s %s after id", tableName, oldColName, newColName, newColType);
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
columns = getColumns(DEFAULT_DB, tableName); columns = getColumns(DEFAULT_DB, tableName);
...@@ -1788,7 +1788,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1788,7 +1788,7 @@ public class HiveHookIT extends HiveITBase {
String testPath = createTestDFSPath("testBaseDir"); String testPath = createTestDFSPath("testBaseDir");
String query = "alter table " + tableName + " set location '" + testPath + "'"; String query = "alter table " + tableName + " set location '" + testPath + "'";
runCommandWithDelay(query, 5000); runCommandWithDelay(query, 8000);
String tblId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { String tblId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override @Override
...@@ -1931,7 +1931,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -1931,7 +1931,7 @@ public class HiveHookIT extends HiveITBase {
String query = String.format("drop table %s ", tableName); String query = String.format("drop table %s ", tableName);
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id")); assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME)); assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME));
...@@ -2005,7 +2005,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -2005,7 +2005,7 @@ public class HiveHookIT extends HiveITBase {
String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName); String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
Thread.sleep(5000); Thread.sleep(10000);
try { try {
atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_DB.getName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName)); atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_DB.getName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName));
...@@ -2055,7 +2055,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -2055,7 +2055,7 @@ public class HiveHookIT extends HiveITBase {
String viewName = tableName(); String viewName = tableName();
String query = "create view " + viewName + " as select * from " + tableName; String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query); runCommandWithDelay(query, 3000);
assertTableIsRegistered(DEFAULT_DB, viewName); assertTableIsRegistered(DEFAULT_DB, viewName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id")); assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id"));
...@@ -2063,7 +2063,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -2063,7 +2063,7 @@ public class HiveHookIT extends HiveITBase {
query = String.format("drop view %s ", viewName); query = String.format("drop view %s ", viewName);
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id")); assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), NAME)); assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), NAME));
assertTableIsNotRegistered(DEFAULT_DB, viewName); assertTableIsNotRegistered(DEFAULT_DB, viewName);
...@@ -2105,7 +2105,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -2105,7 +2105,7 @@ public class HiveHookIT extends HiveITBase {
String fmtQuery = "alter database %s set OWNER %s %s"; String fmtQuery = "alter database %s set OWNER %s %s";
String query = String.format(fmtQuery, dbName, "USER", owner); String query = String.format(fmtQuery, dbName, "USER", owner);
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
assertDatabaseIsRegistered(dbName, new AssertPredicate() { assertDatabaseIsRegistered(dbName, new AssertPredicate() {
@Override @Override
...@@ -2141,7 +2141,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -2141,7 +2141,7 @@ public class HiveHookIT extends HiveITBase {
String query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps)); String query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps));
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
verifyEntityProperties(entityType, entityName, expectedProps, false); verifyEntityProperties(entityType, entityName, expectedProps, false);
...@@ -2150,7 +2150,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -2150,7 +2150,7 @@ public class HiveHookIT extends HiveITBase {
query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps)); query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps));
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
verifyEntityProperties(entityType, entityName, expectedProps, false); verifyEntityProperties(entityType, entityName, expectedProps, false);
...@@ -2161,7 +2161,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -2161,7 +2161,7 @@ public class HiveHookIT extends HiveITBase {
query = String.format(fmtQuery, entityName, UNSET_OP, Joiner.on("','").skipNulls().appendTo(sb, expectedProps.keySet()).append('\'')); query = String.format(fmtQuery, entityName, UNSET_OP, Joiner.on("','").skipNulls().appendTo(sb, expectedProps.keySet()).append('\''));
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 3000);
verifyEntityProperties(entityType, entityName, expectedProps, true); verifyEntityProperties(entityType, entityName, expectedProps, true);
} }
...@@ -2450,7 +2450,7 @@ public class HiveHookIT extends HiveITBase { ...@@ -2450,7 +2450,7 @@ public class HiveHookIT extends HiveITBase {
location = " location '" + createTestDFSPath("someTestPath") + "'"; location = " location '" + createTestDFSPath("someTestPath") + "'";
} }
runCommand("create " + (isExternal ? " EXTERNAL " : "") + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : "") + location); runCommandWithDelay("create " + (isExternal ? " EXTERNAL " : "") + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : "") + location, 3000);
return tableName; return tableName;
} }
......
...@@ -120,22 +120,22 @@ public class HiveMetastoreHookIT extends HiveITBase { ...@@ -120,22 +120,22 @@ public class HiveMetastoreHookIT extends HiveITBase {
String dbName = dbName(); String dbName = dbName();
String query = "CREATE DATABASE " + dbName; String query = "CREATE DATABASE " + dbName;
runCommand(query); runCommandWithDelay(query);
String dbId = assertDatabaseIsRegistered(dbName); String dbId = assertDatabaseIsRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE); assertEquals(getAtlasEntity(dbId).getStatus(), ACTIVE);
String table1 = tableName(); String table1 = tableName();
runCommand("CREATE TABLE " + dbName + "." + table1 + " (name string, age int, dob date)"); runCommandWithDelay("CREATE TABLE " + dbName + "." + table1 + " (name string, age int, dob date)");
String table1Id = assertTableIsRegistered(dbName, table1); String table1Id = assertTableIsRegistered(dbName, table1);
assertEquals(getAtlasEntity(table1Id).getStatus(), ACTIVE); assertEquals(getAtlasEntity(table1Id).getStatus(), ACTIVE);
String table2 = tableName(); String table2 = tableName();
runCommand("CREATE TABLE " + dbName + "." + table2 + " (name string, age int, dob date)"); runCommandWithDelay("CREATE TABLE " + dbName + "." + table2 + " (name string, age int, dob date)");
String table2Id = assertTableIsRegistered(dbName, table2); String table2Id = assertTableIsRegistered(dbName, table2);
assertEquals(getAtlasEntity(table2Id).getStatus(), ACTIVE); assertEquals(getAtlasEntity(table2Id).getStatus(), ACTIVE);
query = "DROP DATABASE " + dbName + " CASCADE"; query = "DROP DATABASE " + dbName + " CASCADE";
runCommand(query); runCommandWithDelay(query);
assertDatabaseIsNotRegistered(dbName); assertDatabaseIsNotRegistered(dbName);
assertEquals(getAtlasEntity(dbId).getStatus(), DELETED); assertEquals(getAtlasEntity(dbId).getStatus(), DELETED);
...@@ -378,7 +378,7 @@ public class HiveMetastoreHookIT extends HiveITBase { ...@@ -378,7 +378,7 @@ public class HiveMetastoreHookIT extends HiveITBase {
} }
protected void runCommandWithDelay(String cmd) throws Exception { protected void runCommandWithDelay(String cmd) throws Exception {
int delayTimeInMs = 5000; int delayTimeInMs = 10000;
runCommandWithDelay(driverWithoutContext, cmd, delayTimeInMs); runCommandWithDelay(driverWithoutContext, cmd, delayTimeInMs);
} }
} }
\ No newline at end of file
...@@ -107,7 +107,7 @@ public class ImpalaLineageITBase { ...@@ -107,7 +107,7 @@ public class ImpalaLineageITBase {
// return guid of the entity // return guid of the entity
protected String assertEntityIsRegistered(final String typeName, final String property, final String value, protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
final AssertPredicate assertPredicate) throws Exception { final AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new Predicate() { waitFor(100000, new Predicate() {
@Override @Override
public void evaluate() throws Exception { public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections
...@@ -169,6 +169,8 @@ public class ImpalaLineageITBase { ...@@ -169,6 +169,8 @@ public class ImpalaLineageITBase {
protected String assertProcessIsRegistered(List<String> processQFNames, String queryString) throws Exception { protected String assertProcessIsRegistered(List<String> processQFNames, String queryString) throws Exception {
try { try {
Thread.sleep(5000);
LOG.debug("Searching for process with query {}", queryString); LOG.debug("Searching for process with query {}", queryString);
return assertEntityIsRegistered(ImpalaDataType.IMPALA_PROCESS.getName(), processQFNames, new AssertPredicates() { return assertEntityIsRegistered(ImpalaDataType.IMPALA_PROCESS.getName(), processQFNames, new AssertPredicates() {
...@@ -194,6 +196,8 @@ public class ImpalaLineageITBase { ...@@ -194,6 +196,8 @@ public class ImpalaLineageITBase {
protected String assertProcessIsRegistered(String processQFName, String queryString) throws Exception { protected String assertProcessIsRegistered(String processQFName, String queryString) throws Exception {
try { try {
Thread.sleep(5000);
LOG.debug("Searching for process with qualified name {} and query {}", processQFName, queryString); LOG.debug("Searching for process with qualified name {} and query {}", processQFName, queryString);
return assertEntityIsRegistered(ImpalaDataType.IMPALA_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() { return assertEntityIsRegistered(ImpalaDataType.IMPALA_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, processQFName, new AssertPredicate() {
...@@ -212,6 +216,8 @@ public class ImpalaLineageITBase { ...@@ -212,6 +216,8 @@ public class ImpalaLineageITBase {
private String assertProcessExecutionIsRegistered(AtlasEntity impalaProcess, final String queryString) throws Exception { private String assertProcessExecutionIsRegistered(AtlasEntity impalaProcess, final String queryString) throws Exception {
try { try {
Thread.sleep(5000);
String guid = ""; String guid = "";
List<AtlasObjectId> processExecutions = toAtlasObjectIdList(impalaProcess.getRelationshipAttribute( List<AtlasObjectId> processExecutions = toAtlasObjectIdList(impalaProcess.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS)); BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS));
...@@ -334,7 +340,7 @@ public class ImpalaLineageITBase { ...@@ -334,7 +340,7 @@ public class ImpalaLineageITBase {
} }
protected String createDatabase(String dbName) throws Exception { protected String createDatabase(String dbName) throws Exception {
runCommand("CREATE DATABASE IF NOT EXISTS " + dbName); runCommandWithDelay("CREATE DATABASE IF NOT EXISTS " + dbName, 3000);
return dbName; return dbName;
} }
...@@ -349,7 +355,7 @@ public class ImpalaLineageITBase { ...@@ -349,7 +355,7 @@ public class ImpalaLineageITBase {
} }
protected String createTable(String dbName, String tableName, String columnsString, boolean isPartitioned) throws Exception { protected String createTable(String dbName, String tableName, String columnsString, boolean isPartitioned) throws Exception {
runCommand("CREATE TABLE IF NOT EXISTS " + dbName + "." + tableName + " " + columnsString + " comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : "")); runCommandWithDelay("CREATE TABLE IF NOT EXISTS " + dbName + "." + tableName + " " + columnsString + " comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : ""), 3000);
return dbName + "." + tableName; return dbName + "." + tableName;
} }
......
...@@ -376,10 +376,10 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -376,10 +376,10 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL); toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
// re-run the same lineage record, should have the same process entity and another process execution entity // re-run the same lineage record, should have the same process entity and another process execution entity
Thread.sleep(500); Thread.sleep(5000);
IMPALA = dir + "impalaMultipleInsertIntoAsSelect2.json"; IMPALA = dir + "impalaMultipleInsertIntoAsSelect2.json";
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL); toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
Thread.sleep(300); Thread.sleep(5000);
// verify the process is saved in Atlas // verify the process is saved in Atlas
// the value is from info in IMPALA_4. // the value is from info in IMPALA_4.
......
...@@ -123,7 +123,7 @@ public class NotificationHookConsumerKafkaTest { ...@@ -123,7 +123,7 @@ public class NotificationHookConsumerKafkaTest {
reset(atlasEntityStore); reset(atlasEntityStore);
} }
@Test @Test (enabled = false)
public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException { public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true); ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment