From 5b4cf4402b895464fb90b477514b8359b6f51c30 Mon Sep 17 00:00:00 2001
From: Hemanth Yamijala <hyamijala@hortonworks.com>
Date: Tue, 21 Jun 2016 22:06:01 +0530
Subject: [PATCH] ATLAS-904 Hive hook fails due to session state not being set (sumasai via yhemanth)

---
 addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java   | 197 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------
 addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java | 299 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------------------------------------------------------------------------------------------------
 release-log.txt                                                             |   1 +
 webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java     |   2 +-
 4 files changed, 303 insertions(+), 196 deletions(-)

diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 46af653..a1a00b3 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -26,7 +26,6 @@ import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.hive.rewrite.HiveASTRewriter;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
@@ -57,13 +56,17 @@ import org.slf4j.LoggerFactory;
 
 import java.net.MalformedURLException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -75,7 +78,6 @@ import java.util.concurrent.TimeUnit;
 public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
 
-
     public static final String CONF_PREFIX = "atlas.hook.hive.";
     private static final String MIN_THREADS = CONF_PREFIX + "minThreads";
     private static final String MAX_THREADS = CONF_PREFIX + "maxThreads";
@@ -84,6 +86,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
 
     public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
+    private static final String SEP = ":".intern();
+    private static final String IO_SEP = "->".intern();
 
     private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
 
@@ -182,12 +186,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
                     }
                 });
             }
-        } catch(Throwable t) {
+        } catch (Throwable t) {
             LOG.error("Submitting to thread pool failed due to error ", t);
         }
     }
 
     private void fireAndForget(HiveEventContext event) throws Exception {
+
         assert event.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
 
         LOG.info("Entered Atlas hook for hook type {} operation {}", event.getHookType(), event.getOperation());
@@ -285,7 +290,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
     private void deleteDatabase(HiveMetaStoreBridge dgiBridge, HiveEventContext event) {
         if (event.getOutputs().size() > 1) {
-            LOG.info("Starting deletion of tables and databases with cascade {} " , event.getQueryStr());
+            LOG.info("Starting deletion of tables and databases with cascade {} ", event.getQueryStr());
         }
 
         for (WriteEntity output : event.getOutputs()) {
@@ -302,10 +307,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         }
     }
 
-    private Pair<String, String> findChangedColNames(List<FieldSchema> oldColList, List<FieldSchema> newColList){
+    private Pair<String, String> findChangedColNames(List<FieldSchema> oldColList, List<FieldSchema> newColList) {
         HashMap<FieldSchema, Integer> oldColHashMap = new HashMap<>();
         HashMap<FieldSchema, Integer> newColHashMap = new HashMap<>();
-        for (int i = 0; i < oldColList.size(); i++){
+        for (int i = 0; i < oldColList.size(); i++) {
             oldColHashMap.put(oldColList.get(i), i);
             newColHashMap.put(newColList.get(i), i);
         }
@@ -313,15 +318,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         String changedColStringOldName = oldColList.get(0).getName();
         String changedColStringNewName = changedColStringOldName;
 
-        for(int i = 0; i < oldColList.size(); i++){
-            if (!newColHashMap.containsKey(oldColList.get(i))){
+        for (int i = 0; i < oldColList.size(); i++) {
+            if (!newColHashMap.containsKey(oldColList.get(i))) {
                 changedColStringOldName = oldColList.get(i).getName();
                 break;
             }
         }
 
-        for(int i = 0; i < newColList.size(); i++){
-            if (!oldColHashMap.containsKey(newColList.get(i))){
+        for (int i = 0; i < newColList.size(); i++) {
+            if (!oldColHashMap.containsKey(newColList.get(i))) {
                 changedColStringNewName = newColList.get(i).getName();
                 break;
             }
@@ -330,7 +335,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         return Pair.of(changedColStringOldName, changedColStringNewName);
     }
 
-    private void renameColumn(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws  Exception{
+    private void renameColumn(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
         assert event.getInputs() != null && event.getInputs().size() == 1;
         assert event.getOutputs() != null && event.getOutputs().size() > 0;
 
@@ -344,20 +349,20 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         Pair<String, String> changedColNamePair = findChangedColNames(oldColList, newColList);
         String oldColName = changedColNamePair.getLeft();
         String newColName = changedColNamePair.getRight();
-        for(WriteEntity writeEntity : event.getOutputs()){
-            if (writeEntity.getType() == Type.TABLE){
+        for (WriteEntity writeEntity : event.getOutputs()) {
+            if (writeEntity.getType() == Type.TABLE) {
                 Table newTable = writeEntity.getTable();
                 createOrUpdateEntities(dgiBridge, event, writeEntity, true, oldTable);
                 final String newQualifiedTableName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
-                        newTable);
+                    newTable);
                 String oldColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newQualifiedTableName, oldColName);
                 String newColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newQualifiedTableName, newColName);
                 Referenceable newColEntity = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
                 newColEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newColumnQFName);
 
                 event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
-                        HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                        oldColumnQFName, newColEntity));
+                    HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                    oldColumnQFName, newColEntity));
             }
         }
         handleEventOutputs(dgiBridge, event, Type.TABLE);
@@ -502,7 +507,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             if (skipTempTables &&
                 table.isTemporary() &&
                 !TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-               LOG.debug("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
+                LOG.debug("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
 
             } else {
                 tableEntity = dgiBridge.createTableInstance(dbEntity, table);
@@ -511,7 +516,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             }
         }
 
-
         event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
         return result;
     }
@@ -538,7 +542,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         return null;
     }
 
-
     public static String lower(String str) {
         if (StringUtils.isEmpty(str)) {
             return null;
@@ -547,18 +550,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     }
 
     public static String normalize(String queryStr) {
-        String result = null;
-        if (queryStr != null) {
-            try {
-                HiveASTRewriter rewriter = new HiveASTRewriter(hiveConf);
-                result = rewriter.rewrite(queryStr);
-            } catch (Exception e) {
-                LOG.warn("Could not rewrite query due to error. Proceeding with original query {}", queryStr, e);
-            }
-        }
-
-        result = lower(result);
-        return result;
+        return lower(queryStr);
     }
 
     private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
@@ -575,8 +567,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             LOG.info("Query id/plan is missing for {}", event.getQueryStr());
         }
 
-        final Map<String, Referenceable> source = new LinkedHashMap<>();
-        final Map<String, Referenceable> target = new LinkedHashMap<>();
+        final SortedMap<Entity, Referenceable> source = new TreeMap<>(entityComparator);
+        final SortedMap<Entity, Referenceable> target = new TreeMap<>(entityComparator);
+
+        final Set<String> dataSets = new HashSet<>();
         final Set<Referenceable> entities = new LinkedHashSet<>();
 
         boolean isSelectQuery = isSelectQuery(event);
@@ -584,22 +578,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         // filter out select queries which do not modify data
         if (!isSelectQuery) {
             for (ReadEntity readEntity : event.getInputs()) {
-                processHiveEntity(dgiBridge, event, readEntity, source, entities);
+                processHiveEntity(dgiBridge, event, readEntity, dataSets, source, entities);
             }
 
             for (WriteEntity writeEntity : event.getOutputs()) {
-                processHiveEntity(dgiBridge, event, writeEntity, target, entities);
+                processHiveEntity(dgiBridge, event, writeEntity, dataSets, target, entities);
             }
 
             if (source.size() > 0 || target.size() > 0) {
-                Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event,
-                    new ArrayList<Referenceable>() {{
-                        addAll(source.values());
-                    }},
-                    new ArrayList<Referenceable>() {{
-                        addAll(target.values());
-                    }});
-
+                Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, source, target);
                 entities.add(processReferenceable);
                 event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
             } else {
@@ -610,20 +597,25 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         }
     }
 
-    private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets, Set<Referenceable> entities) throws Exception {
+    private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Set<String> dataSetsProcessed,
+        SortedMap<Entity, Referenceable> dataSets, Set<Referenceable> entities) throws Exception {
         if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) {
             final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
-            if (!dataSets.containsKey(tblQFName)) {
+            if (!dataSetsProcessed.contains(tblQFName)) {
                 LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false);
-                dataSets.put(tblQFName, result.get(Type.TABLE));
+                dataSets.put(entity, result.get(Type.TABLE));
+                dataSetsProcessed.add(tblQFName);
                 entities.addAll(result.values());
             }
         } else if (entity.getType() == Type.DFS_DIR) {
             final String pathUri = lower(new Path(entity.getLocation()).toString());
             LOG.info("Registering DFS Path {} ", pathUri);
-            Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
-            dataSets.put(pathUri, hdfsPath);
-            entities.add(hdfsPath);
+            if (!dataSetsProcessed.contains(pathUri)) {
+                Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
+                dataSets.put(entity, hdfsPath);
+                dataSetsProcessed.add(pathUri);
+                entities.add(hdfsPath);
+            }
         }
     }
 
@@ -661,24 +653,30 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
     private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) throws HiveException, MalformedURLException {
         List<Referenceable> entities = new ArrayList<>();
-        Table hiveTable = getEntityByType(event.getOutputs(), Type.TABLE).getTable();
+        final Entity hiveEntity = getEntityByType(event.getOutputs(), Type.TABLE);
+        Table hiveTable = hiveEntity.getTable();
         //Refresh to get the correct location
         hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
 
         final String location = lower(hiveTable.getDataLocation().toString());
         if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) {
             LOG.info("Registering external table process {} ", event.getQueryStr());
-            List<Referenceable> inputs = new ArrayList<Referenceable>() {{
-                add(dgiBridge.fillHDFSDataSet(location));
+            final ReadEntity dfsEntity = new ReadEntity();
+            dfsEntity.setTyp(Type.DFS_DIR);
+            dfsEntity.setName(location);
+
+            SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator) {{
+                put(dfsEntity, dgiBridge.fillHDFSDataSet(location));
             }};
 
-            List<Referenceable> outputs = new ArrayList<Referenceable>() {{
-                add(tables.get(Type.TABLE));
+            SortedMap<Entity, Referenceable> outputs = new TreeMap<Entity, Referenceable>(entityComparator) {{
+                put(hiveEntity, tables.get(Type.TABLE));
             }};
 
             Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs);
             String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable);
-            if(isCreateOp(event)){
+
+            if (isCreateOp(event)){
                 processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
             }
             entities.addAll(tables.values());
@@ -697,25 +695,22 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         return false;
     }
 
-    private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) {
+    private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent,
+        SortedMap<Entity, Referenceable> source, SortedMap<Entity, Referenceable> target) {
         Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
 
-        String queryStr = hiveEvent.getQueryStr();
-        if (!isCreateOp(hiveEvent)) {
-            queryStr = normalize(queryStr);
-            processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(queryStr, sourceList, targetList));
-        } else {
-            queryStr = lower(queryStr);
-            processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, queryStr);
-        }
+        String queryStr = lower(hiveEvent.getQueryStr());
+        processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent.getOperation(), source, target));
 
         LOG.debug("Registering query: {}", queryStr);
+        List<Referenceable> sourceList = new ArrayList<>(source.values());
+        List<Referenceable> targetList = new ArrayList<>(target.values());
 
         //The serialization code expected a list
-        if (sourceList != null || !sourceList.isEmpty()) {
+        if (sourceList != null && !sourceList.isEmpty()) {
             processReferenceable.set("inputs", sourceList);
         }
-        if (targetList != null || !targetList.isEmpty()) {
+        if (targetList != null && !targetList.isEmpty()) {
             processReferenceable.set("outputs", targetList);
         }
         processReferenceable.set(AtlasClient.NAME, queryStr);
@@ -729,32 +724,65 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         processReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, dgiBridge.getClusterName());
 
         List<String> recentQueries = new ArrayList<>(1);
-        recentQueries.add(hiveEvent.getQueryStr());
+        recentQueries.add(queryStr);
         processReferenceable.set("recentQueries", recentQueries);
+
         processReferenceable.set("endTime", new Date(System.currentTimeMillis()));
         //TODO set queryGraph
         return processReferenceable;
     }
 
     @VisibleForTesting
-    static String getProcessQualifiedName(String normalizedQuery, List<Referenceable> inputs, List<Referenceable> outputs) {
-        StringBuilder buffer = new StringBuilder(normalizedQuery);
-        addDatasets(buffer, inputs);
-        addDatasets(buffer, outputs);
+    static String getProcessQualifiedName(HiveOperation op, SortedMap<Entity, Referenceable> inputs, SortedMap<Entity, Referenceable> outputs) {
+        StringBuilder buffer = new StringBuilder(op.getOperationName());
+        addDatasets(op, buffer, inputs);
+        buffer.append(IO_SEP);
+        addDatasets(op, buffer, outputs);
+        LOG.info("Setting process qualified name to {}", buffer);
         return buffer.toString();
     }
 
-    private static void addDatasets(StringBuilder buffer, List<Referenceable> refs) {
+    private static void addDatasets(HiveOperation op, StringBuilder buffer, final Map<Entity, Referenceable> refs) {
         if (refs != null) {
-            for (Referenceable input : refs) {
-                //TODO - Change to qualifiedName later
-                buffer.append(":");
-                String dataSetQlfdName = (String) input.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
-                buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
+            for (Entity input : refs.keySet()) {
+                final Entity entity = input;
+
+                //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
+                if (addQueryType(op, entity)) {
+                    buffer.append(SEP);
+                    buffer.append(((WriteEntity) entity).getWriteType().name());
+                }
+                if (Type.DFS_DIR.equals(entity.getType()) ||
+                    Type.LOCAL_DIR.equals(entity.getType())) {
+                    LOG.debug("Skipping dfs dir addition into process qualified name {} ", refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
+                } else {
+                    buffer.append(SEP);
+                    String dataSetQlfdName = (String) refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
+                    // '/' breaks query parsing on ATLAS
+                    buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
+                }
             }
         }
     }
 
+    private static boolean addQueryType(HiveOperation op, Entity entity) {
+        if (WriteEntity.class.isAssignableFrom(entity.getClass())) {
+            if (((WriteEntity) entity).getWriteType() != null &&
+                op.equals(HiveOperation.QUERY)) {
+                switch (((WriteEntity) entity).getWriteType()) {
+                case INSERT:
+                case INSERT_OVERWRITE:
+                case UPDATE:
+                case DELETE:
+                case PATH_WRITE:
+                    return true;
+                default:
+                }
+            }
+        }
+        return false;
+    }
+
     public static class HiveEventContext {
         private Set<ReadEntity> inputs;
         private Set<WriteEntity> outputs;
@@ -768,9 +796,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         private String queryStr;
         private Long queryStartTime;
 
-        private String queryType;
+        private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
 
-        List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
+        private String queryType;
 
         public void setInputs(Set<ReadEntity> inputs) {
             this.inputs = inputs;
@@ -868,4 +896,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             return messages;
         }
     }
+
+    @VisibleForTesting
+    static final class EntityComparator implements Comparator<Entity> {
+        @Override
+        public int compare(Entity o1, Entity o2) {
+            return o1.getName().toLowerCase().compareTo(o2.getName().toLowerCase());
+        }
+    }
+
+    @VisibleForTesting
+    static final Comparator<Entity> entityComparator = new EntityComparator();
 }
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index 4afdb27..c6a7965 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -44,7 +44,10 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.hooks.Entity;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -60,10 +63,15 @@ import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 import static org.apache.atlas.AtlasClient.NAME;
+import static org.apache.atlas.hive.hook.HiveHook.entityComparator;
+import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName;
 import static org.apache.atlas.hive.hook.HiveHook.lower;
 import static org.apache.atlas.hive.hook.HiveHook.normalize;
 import static org.testng.Assert.assertEquals;
@@ -95,6 +103,7 @@ public class HiveHookIT {
         driver = new Driver(conf);
         ss = new SessionState(conf);
         ss = SessionState.start(ss);
+
         SessionState.setCurrentSessionState(ss);
 
         Configuration configuration = ApplicationProperties.get();
@@ -256,19 +265,50 @@ public class HiveHookIT {
         validateHDFSPaths(processReference, INPUTS, pFile);
     }
 
-    private void validateOutputTables(Referenceable processReference, String... expectedTableNames) throws Exception {
-       validateTables(processReference, OUTPUTS, expectedTableNames);
+    private List<Entity> getInputs(String inputName, Entity.Type entityType) {
+        final ReadEntity entity = new ReadEntity();
+
+        if ( Entity.Type.DFS_DIR.equals(entityType)) {
+            entity.setName(lower(new Path(inputName).toString()));
+            entity.setTyp(Entity.Type.DFS_DIR);
+        } else {
+            entity.setName(getQualifiedTblName(inputName));
+            entity.setTyp(Entity.Type.TABLE);
+        }
+
+        return new ArrayList<Entity>() {{ add(entity); }};
+    }
+
+
+    private List<Entity> getOutputs(String inputName, Entity.Type entityType) {
+        final WriteEntity entity = new WriteEntity();
+
+        if ( Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) {
+            entity.setName(lower(new Path(inputName).toString()));
+            entity.setTyp(entityType);
+        } else {
+            entity.setName(getQualifiedTblName(inputName));
+            entity.setTyp(Entity.Type.TABLE);
+        }
+
+        return new ArrayList<Entity>() {{ add(entity); }};
+    }
+
+
+    private void validateOutputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception {
+       validateTables(processReference, OUTPUTS, expectedTables);
     }
 
-    private void validateInputTables(Referenceable processReference, String... expectedTableNames) throws Exception {
-        validateTables(processReference, INPUTS, expectedTableNames);
+    private void validateInputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception {
+        validateTables(processReference, INPUTS, expectedTables);
     }
 
-    private void validateTables(Referenceable processReference, String attrName, String... expectedTableNames) throws Exception {
+    private void validateTables(Referenceable processReference, String attrName, List<Entity> expectedTables) throws Exception {
         List<Id> tableRef = (List<Id>) processReference.get(attrName);
-        for(int i = 0; i < expectedTableNames.length; i++) {
+        for(int i = 0; i < expectedTables.size(); i++) {
             Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId());
-            Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), expectedTableNames[i]);
+            LOG.debug("Validating output {} {} ", i, entity);
+            Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), expectedTables.get(i).getName());
         }
     }
 
@@ -301,7 +341,15 @@ public class HiveHookIT {
         String query = "create table " + ctasTableName + " as select * from " + tableName;
         runCommand(query);
 
-        assertProcessIsRegistered(query);
+        final ReadEntity entity = new ReadEntity();
+        entity.setName(getQualifiedTblName(tableName));
+        entity.setTyp(Entity.Type.TABLE);
+
+        final WriteEntity writeEntity = new WriteEntity();
+        writeEntity.setTyp(Entity.Type.TABLE);
+        writeEntity.setName(getQualifiedTblName(ctasTableName));
+
+        assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, new ArrayList<Entity>() {{ add(entity); }}, new ArrayList<Entity>() {{ add(writeEntity); }});
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
     }
 
@@ -313,7 +361,11 @@ public class HiveHookIT {
         runCommand(query);
 
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
-        String processId = assertProcessIsRegistered(query);
+
+        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        List<Entity> outputs =  getOutputs(ctasTableName, Entity.Type.TABLE);
+
+        String processId = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
 
         final String drpquery = String.format("drop table %s ", ctasTableName);
         runCommand(drpquery);
@@ -322,16 +374,15 @@ public class HiveHookIT {
         //Fix after ATLAS-876
         runCommand(query);
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
-        String process2Id = assertProcessIsRegistered(query);
+        String process2Id = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
 
         Assert.assertEquals(process2Id, processId);
 
         Referenceable processRef = atlasClient.getEntity(processId);
-        String tblQlfdname = getQualifiedTblName(tableName);
-        String ctasQlfdname = getQualifiedTblName(ctasTableName);
 
-        validateInputTables(processRef, tblQlfdname);
-        validateOutputTables(processRef, ctasQlfdname, ctasQlfdname);
+        validateInputTables(processRef, inputs);
+        outputs.add(outputs.get(0));
+        validateOutputTables(processRef, outputs);
     }
 
     @Test
@@ -341,7 +392,7 @@ public class HiveHookIT {
         String query = "create view " + viewName + " as select * from " + tableName;
         runCommand(query);
 
-        assertProcessIsRegistered(query);
+        assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
         assertTableIsRegistered(DEFAULT_DB, viewName);
     }
 
@@ -355,7 +406,7 @@ public class HiveHookIT {
         runCommand(query);
 
         String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name);
-        assertProcessIsRegistered(query);
+        assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
         String viewId = assertTableIsRegistered(DEFAULT_DB, viewName);
 
         //Check lineage which includes table1
@@ -371,7 +422,7 @@ public class HiveHookIT {
         runCommand(query);
 
         //Check if alter view process is reqistered
-        assertProcessIsRegistered(query);
+        assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
         String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name);
         Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId);
 
@@ -408,7 +459,9 @@ public class HiveHookIT {
         String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
         runCommand(query);
 
-        assertProcessIsRegistered(query, null, getQualifiedTblName(tableName));
+        List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
+
+        assertProcessIsRegistered(query, HiveOperation.LOAD, null, outputs);
     }
 
     @Test
@@ -419,7 +472,7 @@ public class HiveHookIT {
         String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName +  " partition(dt = '2015-01-01')";
         runCommand(query);
 
-        validateProcess(query, null, getQualifiedTblName(tableName));
+        validateProcess(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE));
     }
 
     @Test
@@ -429,16 +482,15 @@ public class HiveHookIT {
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
 
         String loadFile = createTestDFSFile("loadDFSFile");
-        final String testPathNormed = lower(new Path(loadFile).toString());
         String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
         runCommand(query);
 
-        final String tblQlfdName = getQualifiedTblName(tableName);
-        Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName);
+        final List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
+        Referenceable processReference = validateProcess(query, HiveOperation.LOAD, getInputs(loadFile, Entity.Type.DFS_DIR), outputs);
 
         validateHDFSPaths(processReference, INPUTS, loadFile);
 
-        validateOutputTables(processReference, tblQlfdName);
+        validateOutputTables(processReference, outputs);
     }
 
     private String getQualifiedTblName(String inputTable) {
@@ -450,20 +502,20 @@ public class HiveHookIT {
         return inputtblQlfdName;
     }
 
-    private Referenceable validateProcess(String query, String inputTable, String... outputTables) throws Exception {
-        String processId = assertProcessIsRegistered(query, inputTable, outputTables);
+    private Referenceable validateProcess(String query, HiveOperation op, List<Entity> inputTables, List<Entity> outputTables) throws Exception {
+        String processId = assertProcessIsRegistered(query, op, inputTables, outputTables);
         Referenceable process = atlasClient.getEntity(processId);
-        if (inputTable == null) {
+        if (inputTables == null) {
             Assert.assertNull(process.get(INPUTS));
         } else {
-            Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), 1);
-            validateInputTables(process, inputTable);
+            Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputTables.size());
+            validateInputTables(process, inputTables);
         }
 
         if (outputTables == null) {
             Assert.assertNull(process.get(OUTPUTS));
         } else {
-            Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1);
+            Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputTables.size());
             validateOutputTables(process, outputTables);
         }
 
@@ -482,12 +534,16 @@ public class HiveHookIT {
         String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
         String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
 
-        Referenceable processRef1 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName));
+        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
+        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
+
+        Referenceable processRef1 = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
 
         //Rerun same query. Should result in same process
         runCommand(query);
 
-        Referenceable processRef2 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName));
+        Referenceable processRef2 = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
         Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId());
 
     }
@@ -500,7 +556,7 @@ public class HiveHookIT {
             "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
 
         runCommand(query);
-        validateProcess(query, getQualifiedTblName(tableName), null);
+        validateProcess(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null);
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
     }
@@ -509,69 +565,80 @@ public class HiveHookIT {
     public void testUpdateProcess() throws Exception {
         String tableName = createTable();
         String pFile1 = createTestDFSPath("somedfspath1");
-        String testPathNormed = lower(new Path(pFile1).toString());
         String query =
             "insert overwrite DIRECTORY '" + pFile1  + "' select id, name from " + tableName;
 
         runCommand(query);
-        String tblQlfdname = getQualifiedTblName(tableName);
-        Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed);
+
+        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
+        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
+
+        Referenceable processReference = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
         validateHDFSPaths(processReference, OUTPUTS, pFile1);
 
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-
-        validateInputTables(processReference, tblQlfdname);
+        validateInputTables(processReference, inputs);
 
         //Rerun same query with same HDFS path
 
         runCommand(query);
-        Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed);
+        Referenceable process2Reference = validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
         validateHDFSPaths(process2Reference, OUTPUTS, pFile1);
 
         Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
 
-        //Rerun same query with a new HDFS path. Should create a new process
-        String pFile2 = createTestDFSPath("somedfspath2");
+        //Rerun same query with a new HDFS path. Will result in same process since HDFS paths are not part of qualifiedName.
+        final String pFile2 = createTestDFSPath("somedfspath2");
         query = "insert overwrite DIRECTORY '" + pFile2  + "' select id, name from " + tableName;
-        final String testPathNormed2 = lower(new Path(pFile2).toString());
         runCommand(query);
+        List<Entity> p3Outputs = new ArrayList<Entity>() {{
+            addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
+            addAll(outputs);
+        }};
 
-        Referenceable process3Reference = validateProcess(query, tblQlfdname, testPathNormed2);
+        Referenceable process3Reference = validateProcess(query,  HiveOperation.QUERY, inputs, p3Outputs);
         validateHDFSPaths(process3Reference, OUTPUTS, pFile2);
 
-        Assert.assertNotEquals(process3Reference.getId()._getId(), processReference.getId()._getId());
+        Assert.assertEquals(process3Reference.getId()._getId(), processReference.getId()._getId());
     }
 
     @Test
     public void testInsertIntoDFSDir() throws Exception {
         String tableName = createTable();
         String pFile1 = createTestDFSPath("somedfspath1");
-        String testPathNormed = lower(new Path(pFile1).toString());
         String query =
             "insert overwrite DIRECTORY '" + pFile1  + "' select id, name from " + tableName;
 
         runCommand(query);
-        String tblQlfdname = getQualifiedTblName(tableName);
-        Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed);
+
+        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
+        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
+
+        Referenceable processReference = validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
         validateHDFSPaths(processReference, OUTPUTS, pFile1);
 
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
 
-        validateInputTables(processReference, tblQlfdname);
+        validateInputTables(processReference, inputs);
 
         //Rerun same query with different HDFS path
 
-        String pFile2 = createTestDFSPath("somedfspath2");
-        testPathNormed = lower(new Path(pFile2).toString());
+        final String pFile2 = createTestDFSPath("somedfspath2");
         query =
             "insert overwrite DIRECTORY '" + pFile2  + "' select id, name from " + tableName;
 
         runCommand(query);
-        tblQlfdname = getQualifiedTblName(tableName);
-        Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed);
+        List<Entity> p2Outputs = new ArrayList<Entity>() {{
+            addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
+            addAll(outputs);
+        }};
+
+        Referenceable process2Reference = validateProcess(query, HiveOperation.QUERY, inputs, p2Outputs);
         validateHDFSPaths(process2Reference, OUTPUTS, pFile2);
 
-        Assert.assertNotEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
+        Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
     }
 
     @Test
@@ -585,7 +652,13 @@ public class HiveHookIT {
             "insert into " + insertTableName + " select id, name from " + tableName;
 
         runCommand(query);
-        validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
+
+        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
+        outputs.get(0).setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
+        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
+
+        validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
         assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
@@ -599,7 +672,12 @@ public class HiveHookIT {
             "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
                 + " where dt = '2015-01-01'";
         runCommand(query);
-        validateProcess(query, getQualifiedTblName(tableName) , getQualifiedTblName(insertTableName));
+
+        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
+        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
+
+        validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
         assertTableIsRegistered(DEFAULT_DB, insertTableName);
@@ -627,28 +705,31 @@ public class HiveHookIT {
     public void testExportImportUnPartitionedTable() throws Exception {
         String tableName = createTable(false);
 
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
 
         String filename = "pfile://" + mkdir("export");
         String query = "export table " + tableName + " to \"" + filename + "\"";
-        final String testPathNormed = lower(new Path(filename).toString());
         runCommand(query);
-        String tblQlfName = getQualifiedTblName(tableName);
-        Referenceable processReference = validateProcess(query, tblQlfName, testPathNormed);
+
+        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
+
+        Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs);
+
         validateHDFSPaths(processReference, OUTPUTS, filename);
-        validateInputTables(processReference, tblQlfName);
+        validateInputTables(processReference, inputs);
 
         //Import
         tableName = createTable(false);
-        tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
 
         query = "import table " + tableName + " from '" + filename + "'";
         runCommand(query);
-        tblQlfName = getQualifiedTblName(tableName);
-        processReference = validateProcess(query, testPathNormed, tblQlfName);
+        outputs = getOutputs(tableName, Entity.Type.TABLE);
+        processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs);
         validateHDFSPaths(processReference, INPUTS, filename);
 
-        validateOutputTables(processReference, tblQlfName);
+        validateOutputTables(processReference, outputs);
     }
 
     @Test
@@ -662,14 +743,16 @@ public class HiveHookIT {
         runCommand(query);
 
         String filename = "pfile://" + mkdir("export");
-        final String testPathNormed = lower(new Path(filename).toString());
         query = "export table " + tableName + " to \"" + filename + "\"";
         runCommand(query);
-        String tblQlfdName = getQualifiedTblName(tableName);
-        Referenceable processReference = validateProcess(query, tblQlfdName, testPathNormed);
+
+        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
+
+        Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs);
         validateHDFSPaths(processReference, OUTPUTS, filename);
 
-        validateInputTables(processReference, tblQlfdName);
+        validateInputTables(processReference, inputs);
 
         //Import
         tableName = createTable(true);
@@ -677,11 +760,12 @@ public class HiveHookIT {
 
         query = "import table " + tableName + " from '" + filename + "'";
         runCommand(query);
-        tblQlfdName = getQualifiedTblName(tableName);
-        processReference = validateProcess(query, testPathNormed, tblQlfdName);
+
+        outputs = getOutputs(tableName, Entity.Type.TABLE);
+        processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs);
         validateHDFSPaths(processReference, INPUTS, filename);
 
-        validateOutputTables(processReference, tblQlfdName);
+        validateOutputTables(processReference, outputs);
     }
 
     @Test
@@ -689,12 +773,13 @@ public class HiveHookIT {
         String tableName = createTable();
         String query = "select * from " + tableName;
         runCommand(query);
-        assertProcessIsNotRegistered(query);
+        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
 
         //check with uppercase table name
         query = "SELECT * from " + tableName.toUpperCase();
         runCommand(query);
-        assertProcessIsNotRegistered(query);
+        assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
     }
 
     @Test
@@ -963,9 +1048,10 @@ public class HiveHookIT {
         String query = String.format("truncate table %s", tableName);
         runCommand(query);
 
+        List<Entity> outputs = getInputs(tableName, Entity.Type.TABLE);
 
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-        validateProcess(query, null, getQualifiedTblName(tableName));
+        validateProcess(query, HiveOperation.TRUNCATETABLE, null, outputs);
 
         //Check lineage
         String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
@@ -1072,10 +1158,10 @@ public class HiveHookIT {
             }
         });
 
-        final String tblQlfdName = getQualifiedTblName(tableName);
+        List<Entity> inputs = getInputs(testPath, Entity.Type.DFS_DIR);
+        List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
 
-        final String testPathNormed = lower(new Path(testPath).toString());
-        Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName);
+        Referenceable processReference = validateProcess(query, HiveOperation.ALTERTABLE_LOCATION, inputs, outputs);
         validateHDFSPaths(processReference, INPUTS, testPath);
     }
 
@@ -1281,7 +1367,6 @@ public class HiveHookIT {
 
         //Should have no effect
         assertDBIsNotRegistered(dbName);
-        assertProcessIsNotRegistered(query);
     }
 
     @Test
@@ -1294,7 +1379,6 @@ public class HiveHookIT {
 
         //Should have no effect
         assertTableIsNotRegistered(DEFAULT_DB, tableName);
-        assertProcessIsNotRegistered(query);
     }
 
     @Test
@@ -1472,56 +1556,39 @@ public class HiveHookIT {
         }
     }
 
-    private String assertProcessIsRegistered(final String queryStr, final String inputTblName, final String... outputTblNames) throws Exception {
-
-        HiveASTRewriter astRewriter = new HiveASTRewriter(conf);
-        String normalizedQuery = normalize(astRewriter.rewrite(queryStr));
-
-        List<Referenceable> inputs = null;
-        if (inputTblName != null) {
-            Referenceable inputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
-                put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, inputTblName);
-            }});
-            inputs = new ArrayList<Referenceable>();
-            inputs.add(inputTableRef);
-        }
-        List<Referenceable> outputs = new ArrayList<Referenceable>();
-        if (outputTblNames != null) {
-            for(int i = 0; i < outputTblNames.length; i++) {
-                final String outputTblName = outputTblNames[i];
-                Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
-                    put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, outputTblName);
-                }});
-
-                outputs.add(outputTableRef);
-            }
-        }
-        String processQFName = HiveHook.getProcessQualifiedName(normalizedQuery, inputs, outputs);
+    private String assertProcessIsRegistered(final String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception {
+        String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
         LOG.debug("Searching for process with query {}", processQFName);
         return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
             @Override
             public void assertOnEntity(final Referenceable entity) throws Exception {
                 List<String> recentQueries = (List<String>) entity.get("recentQueries");
-                Assert.assertEquals(recentQueries.get(0), queryStr);
+                Assert.assertEquals(recentQueries.get(0), lower(queryStr));
             }
         });
     }
 
-    private String assertProcessIsRegistered(final String queryStr) throws Exception {
-        String lowerQryStr = lower(queryStr);
-        LOG.debug("Searching for process with query {}", lowerQryStr);
-        return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, lowerQryStr, new AssertPredicate() {
-            @Override
-            public void assertOnEntity(final Referenceable entity) throws Exception {
-                List<String> recentQueries = (List<String>) entity.get("recentQueries");
-                Assert.assertEquals(recentQueries.get(0), queryStr);
+    private String getDSTypeName(Entity entity) {
+        return Entity.Type.TABLE.equals(entity.getType()) ? HiveDataTypes.HIVE_TABLE.name() : FSDataTypes.HDFS_PATH().toString();
+    }
+
+    private SortedMap<Entity, Referenceable> getSortedProcessDataSets(List<Entity> inputTbls) {
+        SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator);
+        if (inputTbls != null) {
+            for (final Entity tbl : inputTbls) {
+                Referenceable inputTableRef = new Referenceable(getDSTypeName(tbl), new HashMap<String, Object>() {{
+                    put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tbl.getName());
+                }});
+                inputs.put(tbl, inputTableRef);
             }
-        });
+        }
+        return inputs;
     }
 
-    private void assertProcessIsNotRegistered(String queryStr) throws Exception {
-        LOG.debug("Searching for process with query {}", queryStr);
-        assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, normalize(queryStr));
+    private void assertProcessIsNotRegistered(String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception {
+        String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
+        LOG.debug("Searching for process with query {}", processQFName);
+        assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
     }
 
     private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception {
diff --git a/release-log.txt b/release-log.txt
index af861ff..9a91a16 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -39,6 +39,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-904 Hive hook fails due to session state not being set (sumasai via yhemanth)
 ATLAS-929 Add test for trait preservation on column rename for non-default database (svimal2106 via shwethags)
 ATLAS-922 remove test atlas-application.properties embedded in atlas-typesystem.jar (madhan.neethiraj via yhemanth)
 ATLAS-725 UI : Filter out or highlight deleted entities in search result outputs(dsl, text) , schema view, and lineage graph (kevalbhatt18 via sumasai)
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
index 7646534..76e8276 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
@@ -400,7 +400,7 @@ public class EntityResource {
             JSONObject response = getResponse(entityResult);
             return Response.ok(response).build();
         } catch (EntityNotFoundException e) {
-            if(guids != null || !guids.isEmpty()) {
+            if(guids != null && !guids.isEmpty()) {
                 LOG.error("An entity with GUID={} does not exist ", guids, e);
             } else {
                 LOG.error("An entity with qualifiedName {}-{}-{} does not exist", entityType, attribute, value, e);
--
libgit2 0.27.1