Commit baccd1d8 by Madhan Neethiraj

ATLAS-2157: HiveHook fix to handle getTable() error for temproray tables

parent ae576650
...@@ -526,12 +526,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -526,12 +526,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Database db = null; Database db = null;
Table table = null; Table table = null;
Partition partition = null; Partition partition = null;
LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>();
List<Referenceable> entities = new ArrayList<>();
switch (entity.getType()) { switch (entity.getType()) {
case DATABASE: case DATABASE:
db = entity.getDatabase(); db = entity.getDatabase();
if (db != null) {
db = dgiBridge.hiveClient.getDatabase(db.getName());
}
break; break;
case TABLE: case TABLE:
...@@ -549,40 +551,47 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -549,40 +551,47 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.info("{}: entity-type not handled by Atlas hook. Ignored", entity.getType()); LOG.info("{}: entity-type not handled by Atlas hook. Ignored", entity.getType());
} }
if (db != null) { Referenceable dbEntity = null;
db = dgiBridge.hiveClient.getDatabase(db.getName()); Referenceable tableEntity = null;
}
if (db != null) { if (db != null) {
Referenceable dbEntity = dgiBridge.createDBInstance(db); dbEntity = dgiBridge.createDBInstance(db);
}
entities.add(dbEntity);
result.put(Type.DATABASE, dbEntity);
Referenceable tableEntity = null; if (db != null && table != null) {
if (existTable != null) {
table = existTable;
} else {
table = refreshTable(dgiBridge, table.getDbName(), table.getTableName());
}
if (table != null) { if (table != null) {
if (existTable != null) { // If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage.
table = existTable; if (skipTempTables && table.isTemporary() && !TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
} else { LOG.warn("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName());
}
//If its an external table, even though the temp table skip flag is on,
// we create the table since we need the HDFS path to temp table lineage.
if (skipTempTables &&
table.isTemporary() &&
!TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
LOG.debug("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
} else { } else {
tableEntity = dgiBridge.createTableInstance(dbEntity, table); tableEntity = dgiBridge.createTableInstance(dbEntity, table);
entities.add(tableEntity);
result.put(Type.TABLE, tableEntity);
} }
} }
}
LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>();
List<Referenceable> entities = new ArrayList<>();
if (dbEntity != null) {
result.put(Type.DATABASE, dbEntity);
entities.add(dbEntity);
}
if (tableEntity != null) {
result.put(Type.TABLE, tableEntity);
entities.add(tableEntity);
}
if (!entities.isEmpty()) {
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities)); event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
} }
return result; return result;
} }
catch(Exception e) { catch(Exception e) {
...@@ -709,7 +718,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -709,7 +718,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
final String tblQFName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable()); final String tblQFName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
if (!dataSetsProcessed.contains(tblQFName)) { if (!dataSetsProcessed.contains(tblQFName)) {
LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false); LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false);
dataSets.put(entity, result.get(Type.TABLE));
if (result.get(Type.TABLE) != null) {
dataSets.put(entity, result.get(Type.TABLE));
}
dataSetsProcessed.add(tblQFName); dataSetsProcessed.add(tblQFName);
entities.addAll(result.values()); entities.addAll(result.values());
} }
...@@ -760,7 +773,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -760,7 +773,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//Refresh to get the correct location //Refresh to get the correct location
if(hiveTable != null) { if(hiveTable != null) {
hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName()); hiveTable = refreshTable(dgiBridge, hiveTable.getDbName(), hiveTable.getTableName());
} }
if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) { if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) {
...@@ -951,12 +964,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -951,12 +964,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName()); LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName());
} else if (refs.containsKey(input)) { } else if (refs.containsKey(input)) {
if ( input.getType() == Type.PARTITION || input.getType() == Type.TABLE) { if ( input.getType() == Type.PARTITION || input.getType() == Type.TABLE) {
final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(input.getTable().getDbName(), input.getTable().getTableName())); Table inputTable = refreshTable(hiveBridge, input.getTable().getDbName(), input.getTable().getTableName());
addDataset(buffer, refs.get(input), createTime.getTime());
if (inputTable != null) {
final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(inputTable);
addDataset(buffer, refs.get(input), createTime.getTime());
}
} else { } else {
addDataset(buffer, refs.get(input)); addDataset(buffer, refs.get(input));
} }
} }
dataSetsProcessed.add(input.getName().toLowerCase()); dataSetsProcessed.add(input.getName().toLowerCase());
} }
} }
...@@ -995,12 +1013,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -995,12 +1013,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName()); LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName());
} else if (refs.containsKey(output)) { } else if (refs.containsKey(output)) {
if ( output.getType() == Type.PARTITION || output.getType() == Type.TABLE) { if ( output.getType() == Type.PARTITION || output.getType() == Type.TABLE) {
final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(output.getTable().getDbName(), output.getTable().getTableName())); Table outputTable = refreshTable(hiveBridge, output.getTable().getDbName(), output.getTable().getTableName());
addDataset(buffer, refs.get(output), createTime.getTime());
if (outputTable != null) {
final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(outputTable);
addDataset(buffer, refs.get(output), createTime.getTime());
}
} else { } else {
addDataset(buffer, refs.get(output)); addDataset(buffer, refs.get(output));
} }
} }
dataSetsProcessed.add(output.getName().toLowerCase()); dataSetsProcessed.add(output.getName().toLowerCase());
} }
} }
...@@ -1008,6 +1031,16 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -1008,6 +1031,16 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} }
} }
private static Table refreshTable(HiveMetaStoreBridge dgiBridge, String dbName, String tableName) {
try {
return dgiBridge.hiveClient.getTable(dbName, tableName);
} catch (HiveException excp) { // this might be the case for temp tables
LOG.warn("failed to get details for table {}.{}. Ignoring. {}: {}", dbName, tableName, excp.getClass().getCanonicalName(), excp.getMessage());
}
return null;
}
private static boolean addQueryType(HiveOperation op, WriteEntity entity) { private static boolean addQueryType(HiveOperation op, WriteEntity entity) {
if (entity.getWriteType() != null && HiveOperation.QUERY.equals(op)) { if (entity.getWriteType() != null && HiveOperation.QUERY.equals(op)) {
switch (entity.getWriteType()) { switch (entity.getWriteType()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment