diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 6fa1f60..2503d8e 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -92,7 +92,10 @@ public enum AtlasErrorCode {
     RELATIONSHIP_INVALID_ENDTYPE(400, "ATLAS-400-00-045", "Invalid entity-type for relationship attribute ‘{0}’: entity specified (guid={1}) is of type ‘{2}’, but expected type is ‘{3}’"),
     UNKNOWN_CLASSIFICATION(400, "ATLAS-400-00-046", "{0}: Unknown/invalid classification"),
     INVALID_SEARCH_PARAMS(400, "ATLAS-400-00-047", "No search parameter was found. One of the following MUST be specified in the request; typeName, classification or queryText"),
-     // All Not found enums go here
+    INVALID_RELATIONSHIP_ATTRIBUTE(400, "ATLAS-400-00-048", "Expected attribute {0} to be a relationship but found type {}"),
+    INVALID_RELATIONSHIP_TYPE(400, "ATLAS-400-00-049", "Invalid entity type '{0}', guid '{1}' in relationship search"),
+
+    // All Not found enums go here
     TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"),
     TYPE_GUID_NOT_FOUND(404, "ATLAS-404-00-002", "Given type guid {0} was invalid"),
     EMPTY_RESULTS(404, "ATLAS-404-00-004", "No result found for {0}"),
diff --git a/intg/src/main/java/org/apache/atlas/SortOrder.java b/intg/src/main/java/org/apache/atlas/SortOrder.java
new file mode 100644
index 0000000..e3eef4e
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/SortOrder.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas;
+
+public enum SortOrder {
+    ASCENDING, DESCENDING
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
index 5827440..0c32e01 100644
--- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java
@@ -185,7 +185,7 @@ public class AtlasSearchResult implements Serializable {
                 '}';
     }
 
-    public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE }
+    public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE, RELATIONSHIP }
 
     @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
     @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
index 764b548..ead5d3c 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.discovery;
 
 
+import org.apache.atlas.SortOrder;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.AtlasSearchResult;
 import org.apache.atlas.model.discovery.SearchParameters;
@@ -65,4 +66,16 @@ public interface AtlasDiscoveryService {
      * @throws AtlasBaseException
      */
     AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException;
+
+    /**
+     *
+     * @param guid unique ID of the entity.
+     * @param relation relation name.
+     * @param sortByAttribute sort the result using this attribute name, default value is 'name'
+     * @param sortOrder sorting order
+     * @param limit number of resultant rows (for pagination). [ limit > 0 ] and [ limit < maxlimit ]. -1 maps to atlas.search.defaultlimit property.
+     * @param offset offset to the results returned (for pagination). [ offset >= 0 ]. -1 maps to offset 0.
+     * @return AtlasSearchResult
+     */
+    AtlasSearchResult searchRelatedEntities(String guid, String relation, String sortByAttribute, SortOrder sortOrder, int limit, int offset) throws AtlasBaseException;
 }
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index a7aaefe..d5062a7 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -21,6 +21,7 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.SortOrder;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -67,6 +68,7 @@ import scala.util.Either;
 import scala.util.parsing.combinator.Parsers.NoSuccess;
 
 import javax.inject.Inject;
+import javax.script.Bindings;
 import javax.script.ScriptEngine;
 import javax.script.ScriptException;
 import java.util.*;
@@ -74,10 +76,19 @@ import java.util.*;
 import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND;
 import static org.apache.atlas.AtlasErrorCode.DISCOVERY_QUERY_FAILED;
 import static org.apache.atlas.AtlasErrorCode.UNKNOWN_TYPENAME;
+import static org.apache.atlas.SortOrder.DESCENDING;
+import static org.apache.atlas.model.TypeCategory.ARRAY;
+import static org.apache.atlas.model.TypeCategory.MAP;
+import static org.apache.atlas.model.TypeCategory.OBJECT_ID_TYPE;
+import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
+import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH;
+import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH_DESCENDING_SORT;
+import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH_ASCENDING_SORT;
 
 @Component
 public class EntityDiscoveryService implements AtlasDiscoveryService {
     private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
+    private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
 
     private final AtlasGraph                      graph;
     private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
@@ -491,6 +502,98 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
         return ret;
     }
 
+    @Override
+    @GraphTransaction
+    public AtlasSearchResult searchRelatedEntities(String guid, String relation, String sortByAttributeName,
+                                                   SortOrder sortOrder, int limit, int offset) throws AtlasBaseException {
+        AtlasSearchResult ret = new AtlasSearchResult(AtlasQueryType.RELATIONSHIP);
+
+        if (StringUtils.isEmpty(guid) || StringUtils.isEmpty(relation)) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "guid: '" + guid + "', relation: '" + relation + "'");
+        }
+
+        AtlasVertex     entityVertex   = entityRetriever.getEntityVertex(guid);
+        String          entityTypeName = GraphHelper.getTypeName(entityVertex);
+        AtlasEntityType entityType     = typeRegistry.getEntityTypeByName(entityTypeName);
+
+        if (entityType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_TYPE, entityTypeName, guid);
+        }
+
+        AtlasAttribute attribute = entityType.getAttribute(relation);
+
+        if (attribute != null) {
+            if (isRelationshipAttribute(attribute)) {
+                relation = EDGE_LABEL_PREFIX + attribute.getQualifiedName();
+            } else {
+                throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_ATTRIBUTE, relation, attribute.getTypeName());
+            }
+        }
+
+        if (StringUtils.isEmpty(sortByAttributeName)) {
+            sortByAttributeName = DEFAULT_SORT_ATTRIBUTE_NAME;
+        }
+
+        AtlasAttribute sortByAttribute = entityType.getAttribute(sortByAttributeName);
+
+        if (sortByAttribute == null) {
+            sortByAttributeName = null;
+            sortOrder           = null;
+        } else {
+            sortByAttributeName = sortByAttribute.getQualifiedName();
+
+            if (sortOrder == null) {
+                sortOrder = SortOrder.ASCENDING;
+            }
+        }
+
+        String       relatedEntitiesQuery = getRelatedEntitiesQuery(sortOrder);
+        ScriptEngine scriptEngine         = graph.getGremlinScriptEngine();
+        Bindings     bindings             = scriptEngine.createBindings();
+        QueryParams  params               = validateSearchParams(limit, offset);
+
+        bindings.put("g", graph);
+        bindings.put("guid", guid);
+        bindings.put("relation", relation);
+        bindings.put("sortAttributeName", sortByAttributeName);
+        bindings.put("offset", params.offset());
+        bindings.put("limit", params.offset() + params.limit());
+
+        try {
+            Object result = graph.executeGremlinScript(scriptEngine, bindings, relatedEntitiesQuery, false);
+
+            if (result instanceof List && CollectionUtils.isNotEmpty((List) result)) {
+                List<?> queryResult  = (List) result;
+                Object  firstElement = queryResult.get(0);
+
+                if (firstElement instanceof AtlasVertex) {
+                    List<AtlasVertex>       vertices   = (List<AtlasVertex>) queryResult;
+                    List<AtlasEntityHeader> resultList = new ArrayList<>(vertices.size());
+
+                    for (AtlasVertex vertex : vertices) {
+                        resultList.add(entityRetriever.toAtlasEntityHeader(vertex));
+                    }
+
+                    ret.setEntities(resultList);
+                }
+            }
+
+            if (ret.getEntities() == null) {
+                ret.setEntities(new ArrayList<AtlasEntityHeader>());
+            }
+        } catch (ScriptException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Gremlin script execution failed for relationship search query: " + e);
+            }
+
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "Relationship search query failed");
+        } finally {
+            graph.releaseGremlinScriptEngine(scriptEngine);
+        }
+
+        return ret;
+    }
+
     public int getMaxResultSetSize() {
         return maxResultSetSize;
     }
@@ -646,4 +749,35 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
 
         return "";
     }
+
+    private boolean isRelationshipAttribute(AtlasAttribute attribute) throws AtlasBaseException {
+        boolean   ret      = true;
+        AtlasType attrType = attribute.getAttributeType();
+
+        if (attrType.getTypeCategory() == ARRAY) {
+            attrType = ((AtlasArrayType) attrType).getElementType();
+        } else if (attrType.getTypeCategory() == MAP) {
+            attrType = ((AtlasMapType) attrType).getValueType();
+        }
+
+        if (attrType.getTypeCategory() != OBJECT_ID_TYPE) {
+            ret = false;
+        }
+
+        return ret;
+    }
+
+    private String getRelatedEntitiesQuery(SortOrder sortOrder) {
+        final String ret;
+
+        if (sortOrder == null) {
+            ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH);
+        } else if (sortOrder == DESCENDING) {
+            ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH_DESCENDING_SORT);
+        } else {
+            ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH_ASCENDING_SORT);
+        }
+
+        return ret;
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
index 667c61b..3c2ddb7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java
@@ -143,7 +143,7 @@ public final class EntityGraphRetriever {
         return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null;
     }
 
-    private AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
+    public AtlasVertex getEntityVertex(String guid) throws AtlasBaseException {
         AtlasVertex ret = AtlasGraphUtilsV1.findByGuid(guid);
 
         if (ret == null) {
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
index d5cdd1a..58303bf 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java
@@ -95,6 +95,12 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider {
                 return ".filter({it.getProperty('%s').endsWith(%s)})";
             case COMPARE_CONTAINS:
                 return ".filter({it.getProperty('%s').contains(%s)})";
+            case RELATIONSHIP_SEARCH:
+                return "g.V('__guid', guid).both(relation)[offset..<limit].toList()";
+            case RELATIONSHIP_SEARCH_DESCENDING_SORT:
+                return "g.V('__guid', guid).both(relation)[offset..<limit].order{it.b.getProperty(sortAttributeName) <=> it.a.getProperty(sortAttributeName)}.toList()";
+            case RELATIONSHIP_SEARCH_ASCENDING_SORT:
+                return "g.V('__guid', guid).both(relation)[offset..<limit].order{it.a.getProperty(sortAttributeName) <=> it.b.getProperty(sortAttributeName)}.toList()";
         }
         // Should never reach this point
         return null;
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
index 053a070..a602529 100644
--- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java
@@ -61,6 +61,9 @@ public abstract class AtlasGremlinQueryProvider {
         BASIC_SEARCH_STATE_FILTER,
         TO_RANGE_LIST,
         GUID_PREFIX_FILTER,
+        RELATIONSHIP_SEARCH,
+        RELATIONSHIP_SEARCH_ASCENDING_SORT,
+        RELATIONSHIP_SEARCH_DESCENDING_SORT,
 
         // Comparison clauses
         COMPARE_LT,
@@ -72,6 +75,6 @@ public abstract class AtlasGremlinQueryProvider {
         COMPARE_MATCHES,
         COMPARE_STARTS_WITH,
         COMPARE_ENDS_WITH,
-        COMPARE_CONTAINS,
+        COMPARE_CONTAINS
     }
 }
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
index 303c4d8..032bde8 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.web.rest;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.discovery.AtlasDiscoveryService;
 import org.apache.atlas.model.discovery.AtlasSearchResult;
@@ -266,6 +267,44 @@ public class DiscoveryREST {
         }
     }
 
+    /**
+     * Relationship search to search for related entities satisfying the search parameters
+     * @param guid  Attribute name
+     * @param relation relationName
+     * @param sortByAttribute sort the result using this attribute name, default value is 'name'
+     * @param sortOrder sorting order
+     * @param limit limit the result set to only include the specified number of entries
+     * @param offset start offset of the result set (useful for pagination)
+     * @return Atlas search result
+     * @throws AtlasBaseException
+     *
+     * @HTTP 200 On successful search
+     * @HTTP 400 guid is not a valid entity type or attributeName is not a valid relationship attribute
+     */
+    @GET
+    @Path("relationship")
+    @Consumes(Servlets.JSON_MEDIA_TYPE)
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public AtlasSearchResult searchRelatedEntities(@QueryParam("guid")      String    guid,
+                                                   @QueryParam("relation")  String    relation,
+                                                   @QueryParam("sortBy")    String    sortByAttribute,
+                                                   @QueryParam("sortOrder") SortOrder sortOrder,
+                                                   @QueryParam("limit")     int       limit,
+                                                   @QueryParam("offset")    int       offset) throws AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.relatedEntitiesSearchUsingGremlin(" + guid +
+                        ", " + relation + ", " + sortByAttribute + ", " + sortOrder + ", " + limit + ", " + offset + ")");
+            }
+
+            return atlasDiscoveryService.searchRelatedEntities(guid, relation, sortByAttribute, sortOrder, limit, offset);
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
     private boolean isEmpty(SearchParameters.FilterCriteria filterCriteria) {
         return filterCriteria == null ||
                (StringUtils.isEmpty(filterCriteria.getAttributeName()) && CollectionUtils.isEmpty(filterCriteria.getCriterion()));
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index eb37fa8..1a3c413 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -100,8 +100,30 @@ public class NotificationHookConsumerKafkaTest {
             produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
 
             NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
-            NotificationHookConsumer notificationHookConsumer =
-                    new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+            NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+            NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
+
+            consumeOneMessage(consumer, hookConsumer);
+            verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
+
+            // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
+            produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
+            consumeOneMessage(consumer, hookConsumer);
+            verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
+            reset(atlasEntityStore);
+        }
+        finally {
+            kafkaNotification.close();
+        }
+    }
+
+    @Test
+    public void testConsumerConsumesNewMessageWithAutoCommitDisabled1() throws AtlasException, InterruptedException, AtlasBaseException {
+        try {
+            produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+
+            NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false);
+            NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
             NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
 
             consumeOneMessage(consumer, hookConsumer);