From d1940ba75df69cada0dea11708fc9052071bd41e Mon Sep 17 00:00:00 2001 From: Shwetha GS <sshivalingamurthy@hortonworks.com> Date: Fri, 5 Aug 2016 11:50:25 +0530 Subject: [PATCH] ATLAS-1092 Add Table.CreateTime to process qualified Name for all hive_process (sumasai via shwethags) --- addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java | 2 +- addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java | 30 +++++++++++++++++++++++------- addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java | 26 +++++++++++++++++++------- release-log.txt | 1 + 4 files changed, 44 insertions(+), 15 deletions(-) diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 270ecf4..eb08c37 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -408,7 +408,7 @@ public class HiveMetaStoreBridge { return createOrUpdateTableInstance(dbReference, null, hiveTable); } - private static Date getTableCreatedTime(Table table) { + public static Date getTableCreatedTime(Table table) { return new Date(table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR); } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 40e8c5f..7905bcf 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -786,9 +786,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, eventContext.getQueryStr()); } - addInputs(op, sortedHiveInputs, buffer, hiveInputsMap, ignoreHDFSPathsinQFName); + addInputs(dgiBridge, op, sortedHiveInputs, buffer, hiveInputsMap, ignoreHDFSPathsinQFName); buffer.append(IO_SEP); - addOutputs(op, sortedHiveOutputs, buffer, hiveOutputsMap, ignoreHDFSPathsinQFName); + addOutputs(dgiBridge, op, sortedHiveOutputs, buffer, hiveOutputsMap, ignoreHDFSPathsinQFName); LOG.info("Setting process qualified name to {}", buffer); return buffer.toString(); } @@ -815,7 +815,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return false; } - private static void addInputs(HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, final Map<ReadEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) { + private static void addInputs(HiveMetaStoreBridge hiveBridge, HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, final Map<ReadEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) throws HiveException { if (refs != null) { if (sortedInputs != null) { Set<String> dataSetsProcessed = new LinkedHashSet<>(); @@ -827,7 +827,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { (Type.DFS_DIR.equals(input.getType()) || Type.LOCAL_DIR.equals(input.getType()))) { LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName()); } else if (refs.containsKey(input)) { - addDataset(buffer, refs.get(input)); + if ( input.getType() == Type.PARTITION || input.getType() == Type.TABLE) { + final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(input.getTable().getDbName(), input.getTable().getTableName())); + addDataset(buffer, refs.get(input), createTime.getTime()); + } else { + addDataset(buffer, refs.get(input)); + } } dataSetsProcessed.add(input.getName().toLowerCase()); } @@ -837,6 +842,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } } + private static void addDataset(StringBuilder buffer, Referenceable ref, final long createTime) { + addDataset(buffer, ref); + buffer.append(SEP); + buffer.append(createTime); + } + private static void addDataset(StringBuilder buffer, Referenceable ref) { buffer.append(SEP); String dataSetQlfdName = (String) ref.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); @@ -844,11 +855,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", "")); } - private static void addOutputs(HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, final Map<WriteEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) { + private static void addOutputs(HiveMetaStoreBridge hiveBridge, HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, final Map<WriteEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) throws HiveException { if (refs != null) { Set<String> dataSetsProcessed = new LinkedHashSet<>(); if (sortedOutputs != null) { - for (Entity output : sortedOutputs) { + for (WriteEntity output : sortedOutputs) { final Entity entity = output; if (!dataSetsProcessed.contains(output.getName().toLowerCase())) { //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations @@ -860,7 +871,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { (Type.DFS_DIR.equals(output.getType()) || Type.LOCAL_DIR.equals(output.getType()))) { LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName()); } else if (refs.containsKey(output)) { - addDataset(buffer, refs.get(output)); + if ( output.getType() == Type.PARTITION || output.getType() == Type.TABLE) { + final Date createTime = HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(output.getTable().getDbName(), output.getTable().getTableName())); + addDataset(buffer, refs.get(output), createTime.getTime()); + } else { + addDataset(buffer, refs.get(output)); + } } dataSetsProcessed.add(output.getName().toLowerCase()); } diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index e61e916..9258b3e 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -544,13 +544,25 @@ public class HiveHookIT extends HiveITBase { Referenceable processRef1 = validateProcess(event, expectedInputs, outputs); //Test sorting of tbl names - SortedSet<String> sortedTblNames = new TreeSet<>(); - sortedTblNames.add(getQualifiedTblName(inputTable1Name)); - sortedTblNames.add(getQualifiedTblName(inputTable2Name)); - - //Verify sorted orer of inputs in qualified name - Assert.assertEquals(Joiner.on(SEP).join("QUERY", sortedTblNames.first(), sortedTblNames.last()) + IO_SEP + SEP + Joiner.on(SEP).join(WriteEntity.WriteType.INSERT.name(), getQualifiedTblName(insertTableName)) - , processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)); + SortedSet<String> sortedTblNames = new TreeSet<String>(); + sortedTblNames.add(inputTable1Name.toLowerCase()); + sortedTblNames.add(inputTable2Name.toLowerCase()); + + //Verify sorted order of inputs in qualified name + Assert.assertEquals( + processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), + + Joiner.on(SEP).join("QUERY", + getQualifiedTblName(sortedTblNames.first()), + HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, sortedTblNames.first())).getTime(), + getQualifiedTblName(sortedTblNames.last()), + HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, sortedTblNames.last())).getTime()) + + IO_SEP + SEP + + Joiner.on(SEP). + join(WriteEntity.WriteType.INSERT.name(), + getQualifiedTblName(insertTableName), + HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, insertTableName)).getTime()) + ); //Rerun same query. Should result in same process runCommandWithDelay(query, 1000); diff --git a/release-log.txt b/release-log.txt index 5e0d4af..e4aac5e 100644 --- a/release-log.txt +++ b/release-log.txt @@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES: ALL CHANGES: +ATLAS-1092 Add Table.CreateTime to process qualified Name for all hive_process (sumasai via shwethags) ATLAS-1096 Modify HveMetaStoreBridge.import to use getEntity instead of DSL (sumasai via shwethags) ATLAS-1091 Improvement in DSL search functionality. (kevalbhatt) ATLAS-1080 Regression - UI - hive_storagedesc is shown as "undefined" in UI.(kevalbhatt) -- libgit2 0.27.1