Commit d1940ba7 by Shwetha GS

ATLAS-1092 Add Table.CreateTime to process qualified Name for all hive_process…

ATLAS-1092 Add Table.CreateTime to process qualified Name for all hive_process (sumasai via shwethags)
parent 3f51160f
...@@ -408,7 +408,7 @@ public class HiveMetaStoreBridge { ...@@ -408,7 +408,7 @@ public class HiveMetaStoreBridge {
return createOrUpdateTableInstance(dbReference, null, hiveTable); return createOrUpdateTableInstance(dbReference, null, hiveTable);
} }
private static Date getTableCreatedTime(Table table) { public static Date getTableCreatedTime(Table table) {
return new Date(table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR); return new Date(table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR);
} }
......
...@@ -786,9 +786,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -786,9 +786,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, eventContext.getQueryStr()); LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, eventContext.getQueryStr());
} }
addInputs(op, sortedHiveInputs, buffer, hiveInputsMap, ignoreHDFSPathsinQFName); addInputs(dgiBridge, op, sortedHiveInputs, buffer, hiveInputsMap, ignoreHDFSPathsinQFName);
buffer.append(IO_SEP); buffer.append(IO_SEP);
addOutputs(op, sortedHiveOutputs, buffer, hiveOutputsMap, ignoreHDFSPathsinQFName); addOutputs(dgiBridge, op, sortedHiveOutputs, buffer, hiveOutputsMap, ignoreHDFSPathsinQFName);
LOG.info("Setting process qualified name to {}", buffer); LOG.info("Setting process qualified name to {}", buffer);
return buffer.toString(); return buffer.toString();
} }
...@@ -815,7 +815,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -815,7 +815,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return false; return false;
} }
private static void addInputs(HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, final Map<ReadEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) { private static void addInputs(HiveMetaStoreBridge hiveBridge, HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, final Map<ReadEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) throws HiveException {
if (refs != null) { if (refs != null) {
if (sortedInputs != null) { if (sortedInputs != null) {
Set<String> dataSetsProcessed = new LinkedHashSet<>(); Set<String> dataSetsProcessed = new LinkedHashSet<>();
...@@ -827,8 +827,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -827,8 +827,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
(Type.DFS_DIR.equals(input.getType()) || Type.LOCAL_DIR.equals(input.getType()))) { (Type.DFS_DIR.equals(input.getType()) || Type.LOCAL_DIR.equals(input.getType()))) {
LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName()); LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName());
} else if (refs.containsKey(input)) { } else if (refs.containsKey(input)) {
if ( input.getType() == Type.PARTITION || input.getType() == Type.TABLE) {
final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(input.getTable().getDbName(), input.getTable().getTableName()));
addDataset(buffer, refs.get(input), createTime.getTime());
} else {
addDataset(buffer, refs.get(input)); addDataset(buffer, refs.get(input));
} }
}
dataSetsProcessed.add(input.getName().toLowerCase()); dataSetsProcessed.add(input.getName().toLowerCase());
} }
} }
...@@ -837,6 +842,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -837,6 +842,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} }
} }
private static void addDataset(StringBuilder buffer, Referenceable ref, final long createTime) {
addDataset(buffer, ref);
buffer.append(SEP);
buffer.append(createTime);
}
private static void addDataset(StringBuilder buffer, Referenceable ref) { private static void addDataset(StringBuilder buffer, Referenceable ref) {
buffer.append(SEP); buffer.append(SEP);
String dataSetQlfdName = (String) ref.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); String dataSetQlfdName = (String) ref.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
...@@ -844,11 +855,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -844,11 +855,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", "")); buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
} }
private static void addOutputs(HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, final Map<WriteEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) { private static void addOutputs(HiveMetaStoreBridge hiveBridge, HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, final Map<WriteEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) throws HiveException {
if (refs != null) { if (refs != null) {
Set<String> dataSetsProcessed = new LinkedHashSet<>(); Set<String> dataSetsProcessed = new LinkedHashSet<>();
if (sortedOutputs != null) { if (sortedOutputs != null) {
for (Entity output : sortedOutputs) { for (WriteEntity output : sortedOutputs) {
final Entity entity = output; final Entity entity = output;
if (!dataSetsProcessed.contains(output.getName().toLowerCase())) { if (!dataSetsProcessed.contains(output.getName().toLowerCase())) {
//HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
...@@ -860,8 +871,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -860,8 +871,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
(Type.DFS_DIR.equals(output.getType()) || Type.LOCAL_DIR.equals(output.getType()))) { (Type.DFS_DIR.equals(output.getType()) || Type.LOCAL_DIR.equals(output.getType()))) {
LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName()); LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName());
} else if (refs.containsKey(output)) { } else if (refs.containsKey(output)) {
if ( output.getType() == Type.PARTITION || output.getType() == Type.TABLE) {
final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(output.getTable().getDbName(), output.getTable().getTableName()));
addDataset(buffer, refs.get(output), createTime.getTime());
} else {
addDataset(buffer, refs.get(output)); addDataset(buffer, refs.get(output));
} }
}
dataSetsProcessed.add(output.getName().toLowerCase()); dataSetsProcessed.add(output.getName().toLowerCase());
} }
} }
......
...@@ -544,13 +544,25 @@ public class HiveHookIT extends HiveITBase { ...@@ -544,13 +544,25 @@ public class HiveHookIT extends HiveITBase {
Referenceable processRef1 = validateProcess(event, expectedInputs, outputs); Referenceable processRef1 = validateProcess(event, expectedInputs, outputs);
//Test sorting of tbl names //Test sorting of tbl names
SortedSet<String> sortedTblNames = new TreeSet<>(); SortedSet<String> sortedTblNames = new TreeSet<String>();
sortedTblNames.add(getQualifiedTblName(inputTable1Name)); sortedTblNames.add(inputTable1Name.toLowerCase());
sortedTblNames.add(getQualifiedTblName(inputTable2Name)); sortedTblNames.add(inputTable2Name.toLowerCase());
//Verify sorted orer of inputs in qualified name //Verify sorted order of inputs in qualified name
Assert.assertEquals(Joiner.on(SEP).join("QUERY", sortedTblNames.first(), sortedTblNames.last()) + IO_SEP + SEP + Joiner.on(SEP).join(WriteEntity.WriteType.INSERT.name(), getQualifiedTblName(insertTableName)) Assert.assertEquals(
, processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)); processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
Joiner.on(SEP).join("QUERY",
getQualifiedTblName(sortedTblNames.first()),
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, sortedTblNames.first())).getTime(),
getQualifiedTblName(sortedTblNames.last()),
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, sortedTblNames.last())).getTime())
+ IO_SEP + SEP
+ Joiner.on(SEP).
join(WriteEntity.WriteType.INSERT.name(),
getQualifiedTblName(insertTableName),
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, insertTableName)).getTime())
);
//Rerun same query. Should result in same process //Rerun same query. Should result in same process
runCommandWithDelay(query, 1000); runCommandWithDelay(query, 1000);
......
...@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES: ...@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ALL CHANGES: ALL CHANGES:
ATLAS-1092 Add Table.CreateTime to process qualified Name for all hive_process (sumasai via shwethags)
ATLAS-1096 Modify HveMetaStoreBridge.import to use getEntity instead of DSL (sumasai via shwethags) ATLAS-1096 Modify HveMetaStoreBridge.import to use getEntity instead of DSL (sumasai via shwethags)
ATLAS-1091 Improvement in DSL search functionality. (kevalbhatt) ATLAS-1091 Improvement in DSL search functionality. (kevalbhatt)
ATLAS-1080 Regression - UI - hive_storagedesc is shown as "undefined" in UI.(kevalbhatt) ATLAS-1080 Regression - UI - hive_storagedesc is shown as "undefined" in UI.(kevalbhatt)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment