Commit 3a0865ad by apoorvnaik Committed by Madhan Neethiraj

ATLAS-1499: Notification processing using V2 Store

parent e4cc16ac
...@@ -75,9 +75,9 @@ public enum AtlasErrorCode { ...@@ -75,9 +75,9 @@ public enum AtlasErrorCode {
INSTANCE_LINEAGE_QUERY_FAILED(404, "ATLAS4047E", "Instance lineage query failed {0}"), INSTANCE_LINEAGE_QUERY_FAILED(404, "ATLAS4047E", "Instance lineage query failed {0}"),
DISCOVERY_QUERY_FAILED(404, "ATLAS4048E", "Discovery query failed {0}"), DISCOVERY_QUERY_FAILED(404, "ATLAS4048E", "Discovery query failed {0}"),
INSTANCE_CRUD_INVALID_PARAMS(404, "ATLAS4049E", "Invalid instance creation/updation parameters passed : {0}"), INSTANCE_CRUD_INVALID_PARAMS(404, "ATLAS4049E", "Invalid instance creation/updation parameters passed : {0}"),
INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND(404, "ATLAS40410E", "Instance {0} with unique attribute {1} does not exist"), INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND(404, "ATLAS40410E", "Instance {0} with unique attribute {1} does not exist"),
REFERENCED_ENTITY_NOT_FOUND(404, "ATLAS40411E", "Referenced entity {0} is not found"), REFERENCED_ENTITY_NOT_FOUND(404, "ATLAS40411E", "Referenced entity {0} is not found"),
// All data conflict errors go here // All data conflict errors go here
TYPE_ALREADY_EXISTS(409, "ATLAS4091E", "Given type {0} already exists"), TYPE_ALREADY_EXISTS(409, "ATLAS4091E", "Given type {0} already exists"),
TYPE_HAS_REFERENCES(409, "ATLAS4092E", "Given type {0} has references"), TYPE_HAS_REFERENCES(409, "ATLAS4092E", "Given type {0} has references"),
...@@ -87,7 +87,8 @@ public enum AtlasErrorCode { ...@@ -87,7 +87,8 @@ public enum AtlasErrorCode {
INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"), INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"),
INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"), INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"),
INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"), INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"),
FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"); FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"),
NOTIFICATION_FAILED(500, "ATLAS5005E", "Failed to notify for change {0}");
private String errorCode; private String errorCode;
private String errorMessage; private String errorMessage;
......
...@@ -69,7 +69,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable { ...@@ -69,7 +69,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
private String updatedBy = null; private String updatedBy = null;
private Date createTime = null; private Date createTime = null;
private Date updateTime = null; private Date updateTime = null;
private Long version = new Long(0); private Long version = 0L;
private List<AtlasClassification> classifications; private List<AtlasClassification> classifications;
......
...@@ -45,8 +45,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; ...@@ -45,8 +45,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
@XmlAccessorType(XmlAccessType.PROPERTY) @XmlAccessorType(XmlAccessType.PROPERTY)
public class EntityMutationResponse { public class EntityMutationResponse {
Map<EntityOperation, List<AtlasEntityHeader>> mutatedEntities; private Map<EntityOperation, List<AtlasEntityHeader>> mutatedEntities;
Map<String, String> guidAssignments; private Map<String, String> guidAssignments;
public EntityMutationResponse() { public EntityMutationResponse() {
} }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.atlas.repository.audit; package org.apache.atlas.repository.audit;
import com.google.inject.Inject;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.EntityAuditEvent.EntityAuditAction; import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
...@@ -34,6 +33,7 @@ import org.apache.commons.collections.MapUtils; ...@@ -34,6 +33,7 @@ import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
...@@ -109,6 +109,10 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -109,6 +109,10 @@ public class EntityAuditListener implements EntityChangeListener {
auditRepository.putEvents(events); auditRepository.putEvents(events);
} }
public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{
return auditRepository.listEvents(guid, null, (short) 10);
}
private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, EntityAuditAction action) private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, EntityAuditAction action)
throws AtlasException { throws AtlasException {
String detail = getAuditEventDetail(entity, action); String detail = getAuditEventDetail(entity, action);
...@@ -189,29 +193,11 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -189,29 +193,11 @@ public class EntityAuditListener implements EntityChangeListener {
if (attrValue instanceof Collection) { if (attrValue instanceof Collection) {
for (Object attribute : (Collection) attrValue) { for (Object attribute : (Collection) attrValue) {
if (attribute instanceof ITypedReferenceableInstance) { if (attribute instanceof ITypedReferenceableInstance) {
ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attribute; ret = pruneAttributes(ret, (ITypedReferenceableInstance) attribute);
Map<String, Object> prunedAttrs = pruneEntityAttributesForAudit(attrInstance);
if (MapUtils.isNotEmpty(prunedAttrs)) {
if (ret == null) {
ret = new HashMap<>();
}
ret.put(attrInstance.getId()._getId(), prunedAttrs);
}
} }
} }
} else if (attrValue instanceof ITypedReferenceableInstance) { } else if (attrValue instanceof ITypedReferenceableInstance) {
ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attrValue; ret = pruneAttributes(ret, (ITypedReferenceableInstance) attrValue);
Map<String, Object> prunedAttrs = pruneEntityAttributesForAudit(attrInstance);
if (MapUtils.isNotEmpty(prunedAttrs)) {
if (ret == null) {
ret = new HashMap<>();
}
ret.put(attrInstance.getId()._getId(), prunedAttrs);
}
} }
} }
} }
...@@ -220,6 +206,20 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -220,6 +206,20 @@ public class EntityAuditListener implements EntityChangeListener {
return ret; return ret;
} }
private Map<String, Object> pruneAttributes(Map<String, Object> ret, ITypedReferenceableInstance attribute) throws AtlasException {
ITypedReferenceableInstance attrInstance = attribute;
Map<String, Object> prunedAttrs = pruneEntityAttributesForAudit(attrInstance);
if (MapUtils.isNotEmpty(prunedAttrs)) {
if (ret == null) {
ret = new HashMap<>();
}
ret.put(attrInstance.getId()._getId(), prunedAttrs);
}
return ret;
}
private void restoreEntityAttributes(ITypedReferenceableInstance entity, Map<String, Object> prunedAttributes) throws AtlasException { private void restoreEntityAttributes(ITypedReferenceableInstance entity, Map<String, Object> prunedAttributes) throws AtlasException {
if (MapUtils.isEmpty(prunedAttributes)) { if (MapUtils.isEmpty(prunedAttributes)) {
return; return;
...@@ -240,27 +240,25 @@ public class EntityAuditListener implements EntityChangeListener { ...@@ -240,27 +240,25 @@ public class EntityAuditListener implements EntityChangeListener {
if (attrValue instanceof Collection) { if (attrValue instanceof Collection) {
for (Object attributeEntity : (Collection) attrValue) { for (Object attributeEntity : (Collection) attrValue) {
if (attributeEntity instanceof ITypedReferenceableInstance) { if (attributeEntity instanceof ITypedReferenceableInstance) {
ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attributeEntity; restoreAttributes(prunedAttributes, (ITypedReferenceableInstance) attributeEntity);
Object obj = prunedAttributes.get(attrInstance.getId()._getId());
if (obj instanceof Map) {
restoreEntityAttributes(attrInstance, (Map) obj);
}
} }
} }
} else if (attrValue instanceof ITypedReferenceableInstance) { } else if (attrValue instanceof ITypedReferenceableInstance) {
ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attrValue; restoreAttributes(prunedAttributes, (ITypedReferenceableInstance) attrValue);
Object obj = prunedAttributes.get(attrInstance.getId()._getId());
if (obj instanceof Map) {
restoreEntityAttributes(attrInstance, (Map) obj);
}
} }
} }
} }
} }
} }
private void restoreAttributes(Map<String, Object> prunedAttributes, ITypedReferenceableInstance attributeEntity) throws AtlasException {
Object obj = prunedAttributes.get(attributeEntity.getId()._getId());
if (obj instanceof Map) {
restoreEntityAttributes(attributeEntity, (Map) obj);
}
}
private String getAuditPrefix(EntityAuditAction action) { private String getAuditPrefix(EntityAuditAction action) {
final String ret; final String ret;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.TypeCategory;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
...@@ -51,22 +51,18 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { ...@@ -51,22 +51,18 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter {
} }
@Override @Override
public Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext context) throws AtlasBaseException { public AtlasEntity fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext context) throws AtlasBaseException {
AtlasObjectId ret = null; AtlasEntity entity = null;
if (v1Obj != null) { if (v1Obj != null) {
AtlasEntityType entityType = (AtlasEntityType) type; AtlasEntityType entityType = (AtlasEntityType) type;
if (v1Obj instanceof Id) { if (v1Obj instanceof IReferenceableInstance) {
Id id = (Id) v1Obj;
ret = new AtlasObjectId(id._getId(), id.getTypeName());
} else if (v1Obj instanceof IReferenceableInstance) {
IReferenceableInstance entRef = (IReferenceableInstance) v1Obj; IReferenceableInstance entRef = (IReferenceableInstance) v1Obj;
ret = new AtlasObjectId(entRef.getId()._getId(), entRef.getTypeName()); String guid = entRef.getId()._getId();
if (!context.entityExists(ret.getGuid())) { if (!context.entityExists(guid)) {
Map<String, Object> v1Attribs = null; Map<String, Object> v1Attribs = null;
try { try {
...@@ -75,7 +71,7 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { ...@@ -75,7 +71,7 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter {
LOG.error("IReferenceableInstance.getValuesMap() failed", excp); LOG.error("IReferenceableInstance.getValuesMap() failed", excp);
} }
AtlasEntity entity = new AtlasEntity(entRef.getTypeName(), entity = new AtlasEntity(entRef.getTypeName(),
super.fromV1ToV2(entityType, v1Attribs, context)); super.fromV1ToV2(entityType, v1Attribs, context));
entity.setGuid(entRef.getId()._getId()); entity.setGuid(entRef.getId()._getId());
entity.setStatus(convertState(entRef.getId().getState())); entity.setStatus(convertState(entRef.getId().getState()));
...@@ -83,7 +79,7 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { ...@@ -83,7 +79,7 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter {
entity.setCreateTime(entRef.getSystemAttributes().createdTime); entity.setCreateTime(entRef.getSystemAttributes().createdTime);
entity.setUpdatedBy(entRef.getSystemAttributes().modifiedBy); entity.setUpdatedBy(entRef.getSystemAttributes().modifiedBy);
entity.setUpdateTime(entRef.getSystemAttributes().modifiedTime); entity.setUpdateTime(entRef.getSystemAttributes().modifiedTime);
entity.setVersion(new Long(entRef.getId().version)); entity.setVersion((long) entRef.getId().version);
if (CollectionUtils.isNotEmpty(entRef.getTraits())) { if (CollectionUtils.isNotEmpty(entRef.getTraits())) {
List<AtlasClassification> classifications = new ArrayList<>(); List<AtlasClassification> classifications = new ArrayList<>();
...@@ -99,18 +95,18 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { ...@@ -99,18 +95,18 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter {
entity.setClassifications(classifications); entity.setClassifications(classifications);
} }
} else {
context.addEntity(entity); entity = context.getById(guid);
} }
} else { } else {
throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "IReferenceableInstance", throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "IReferenceableInstance",
v1Obj.getClass().getCanonicalName()); v1Obj.getClass().getCanonicalName());
} }
} }
return ret; return entity;
} }
private AtlasEntity.Status convertState(EntityState state){ private Status convertState(EntityState state){
Status status = Status.ACTIVE; Status status = Status.ACTIVE;
if(state != null && state.equals(EntityState.DELETED)){ if(state != null && state.equals(EntityState.DELETED)){
status = Status.DELETED; status = Status.DELETED;
...@@ -160,7 +156,6 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { ...@@ -160,7 +156,6 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter {
v2Obj.getClass().getCanonicalName()); v2Obj.getClass().getCanonicalName());
} }
} }
return ret; return ret;
} }
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
......
...@@ -15,16 +15,15 @@ ...@@ -15,16 +15,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import java.util.HashMap;
import java.util.Map;
public interface AtlasFormatConverter { public interface AtlasFormatConverter {
Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext context) throws AtlasBaseException; Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext context) throws AtlasBaseException;
...@@ -33,28 +32,35 @@ public interface AtlasFormatConverter { ...@@ -33,28 +32,35 @@ public interface AtlasFormatConverter {
TypeCategory getTypeCategory(); TypeCategory getTypeCategory();
public static class ConverterContext { class ConverterContext {
private Map<String, AtlasEntity> entities = null; private AtlasEntity.AtlasEntitiesWithExtInfo entities = null;
public void addEntity(AtlasEntity entity) { public void addEntity(AtlasEntity entity) {
if (entities == null) { if (entities == null) {
entities = new HashMap<>(); entities = new AtlasEntitiesWithExtInfo();
} }
entities.put(entity.getGuid(), entity); entities.addEntity(entity);
}
public void addReferredEntity(AtlasEntity entity) {
if (entities == null) {
entities = new AtlasEntitiesWithExtInfo();
}
entities.addReferredEntity(entity);
} }
public AtlasEntity getById(String guid) { public AtlasEntity getById(String guid) {
if( entities != null) { if( entities != null) {
return entities.get(guid); return entities.getEntity(guid);
} }
return null; return null;
} }
public boolean entityExists(String guid) { return entities != null && entities.containsKey(guid); } public boolean entityExists(String guid) { return entities != null && entities.hasEntity(guid); }
public Map<String, AtlasEntity> getEntities() { public AtlasEntitiesWithExtInfo getEntities() {
return entities; return entities;
} }
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
...@@ -41,6 +41,7 @@ public class AtlasFormatConverters { ...@@ -41,6 +41,7 @@ public class AtlasFormatConverters {
registerConverter(new AtlasEntityFormatConverter(this, typeRegistry)); registerConverter(new AtlasEntityFormatConverter(this, typeRegistry));
registerConverter(new AtlasArrayFormatConverter(this, typeRegistry)); registerConverter(new AtlasArrayFormatConverter(this, typeRegistry));
registerConverter(new AtlasMapFormatConverter(this, typeRegistry)); registerConverter(new AtlasMapFormatConverter(this, typeRegistry));
registerConverter(new AtlasObjectIdConverter(this, typeRegistry));
} }
private void registerConverter(AtlasFormatConverter converter) { private void registerConverter(AtlasFormatConverter converter) {
......
...@@ -15,8 +15,10 @@ ...@@ -15,8 +15,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
...@@ -48,14 +50,12 @@ import org.slf4j.LoggerFactory; ...@@ -48,14 +50,12 @@ import org.slf4j.LoggerFactory;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import com.google.inject.Inject; import java.util.List;
import com.google.inject.Singleton;
import java.util.Map;
@Singleton @Singleton
public class AtlasInstanceRestAdapters { public class AtlasInstanceConverter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasInstanceRestAdapters.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasInstanceConverter.class);
@Inject @Inject
private AtlasTypeRegistry typeRegistry; private AtlasTypeRegistry typeRegistry;
...@@ -125,9 +125,9 @@ public class AtlasInstanceRestAdapters { ...@@ -125,9 +125,9 @@ public class AtlasInstanceRestAdapters {
return ret; return ret;
} }
public Map<String, AtlasEntity> getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException { public AtlasEntity.AtlasEntitiesWithExtInfo getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException {
AtlasFormatConverter converter = instanceFormatters.getConverter(TypeCategory.ENTITY); AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName()); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName());
if (entityType == null) { if (entityType == null) {
...@@ -136,7 +136,8 @@ public class AtlasInstanceRestAdapters { ...@@ -136,7 +136,8 @@ public class AtlasInstanceRestAdapters {
AtlasFormatConverter.ConverterContext ctx = new AtlasFormatConverter.ConverterContext(); AtlasFormatConverter.ConverterContext ctx = new AtlasFormatConverter.ConverterContext();
converter.fromV1ToV2(referenceable, entityType, ctx); AtlasEntity entity = converter.fromV1ToV2(referenceable, entityType, ctx);
ctx.addEntity(entity);
return ctx.getEntities(); return ctx.getEntities();
} }
...@@ -186,4 +187,37 @@ public class AtlasInstanceRestAdapters { ...@@ -186,4 +187,37 @@ public class AtlasInstanceRestAdapters {
return new AtlasBaseException(e); return new AtlasBaseException(e);
} }
public AtlasEntity.AtlasEntitiesWithExtInfo getEntities(List<Referenceable> referenceables) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getEntities");
}
AtlasFormatConverter.ConverterContext context = new AtlasFormatConverter.ConverterContext();
for (Referenceable referenceable : referenceables) {
AtlasEntity entity = fromV1toV2Entity(referenceable, context);
context.addEntity(entity);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== getEntities");
}
return context.getEntities();
}
private AtlasEntity fromV1toV2Entity(Referenceable referenceable, AtlasFormatConverter.ConverterContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> fromV1toV2Entity");
}
AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY);
AtlasEntity entity = converter.fromV1ToV2(referenceable, typeRegistry.getType(referenceable.getTypeName()), context);
if (LOG.isDebugEnabled()) {
LOG.debug("<== fromV1toV2Entity");
}
return entity;
}
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
...@@ -77,12 +77,12 @@ public class AtlasMapFormatConverter extends AtlasAbstractFormatConverter { ...@@ -77,12 +77,12 @@ public class AtlasMapFormatConverter extends AtlasAbstractFormatConverter {
AtlasType valueType = mapType.getValueType(); AtlasType valueType = mapType.getValueType();
AtlasFormatConverter keyConverter = converterRegistry.getConverter(keyType.getTypeCategory()); AtlasFormatConverter keyConverter = converterRegistry.getConverter(keyType.getTypeCategory());
AtlasFormatConverter valueConverter = converterRegistry.getConverter(valueType.getTypeCategory()); AtlasFormatConverter valueConverter = converterRegistry.getConverter(valueType.getTypeCategory());
Map v1Map = (Map)v2Obj; Map v2Map = (Map)v2Obj;
ret = new HashMap<>(); ret = new HashMap<>();
for (Object key : v1Map.keySet()) { for (Object key : v2Map.keySet()) {
Object value = v1Map.get(key); Object value = v2Map.get(key);
Object v2Key = keyConverter.fromV2ToV1(key, keyType, ctx); Object v2Key = keyConverter.fromV2ToV1(key, keyType, ctx);
Object v2Value = valueConverter.fromV2ToV1(value, valueType, ctx); Object v2Value = valueConverter.fromV2ToV1(value, valueType, ctx);
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
...@@ -23,12 +23,11 @@ import org.apache.atlas.exception.AtlasBaseException; ...@@ -23,12 +23,11 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.typesystem.persistence.Id;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.Map; import java.util.Map;
...@@ -53,8 +52,19 @@ AtlasObjectIdConverter extends AtlasAbstractFormatConverter { ...@@ -53,8 +52,19 @@ AtlasObjectIdConverter extends AtlasAbstractFormatConverter {
Id id = (Id) v1Obj; Id id = (Id) v1Obj;
ret = new AtlasObjectId(id._getId(), id.getTypeName()); ret = new AtlasObjectId(id._getId(), id.getTypeName());
} else if (v1Obj instanceof IReferenceableInstance) { } else if (v1Obj instanceof IReferenceableInstance) {
IReferenceableInstance entity = (IReferenceableInstance) v1Obj; IReferenceableInstance refInst = (IReferenceableInstance) v1Obj;
ret = new AtlasObjectId(entity.getId()._getId(), entity.getTypeName());
String guid = refInst.getId()._getId();
ret = new AtlasObjectId(guid, refInst.getTypeName());
if (!converterContext.entityExists(guid)) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(refInst.getTypeName());
AtlasEntityFormatConverter entityFormatConverter = (AtlasEntityFormatConverter) converterRegistry.getConverter(TypeCategory.ENTITY);
AtlasEntity entity = entityFormatConverter.fromV1ToV2(v1Obj, entityType, converterContext);
converterContext.addReferredEntity(entity);
}
} }
} }
return ret; return ret;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
......
...@@ -15,16 +15,16 @@ ...@@ -15,16 +15,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.web.adapters; package org.apache.atlas.repository.converters;
import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.*; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.Struct;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
...@@ -35,6 +35,7 @@ import java.util.Collection; ...@@ -35,6 +35,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasStructFormatConverter.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasStructFormatConverter.class);
...@@ -124,7 +125,9 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { ...@@ -124,7 +125,9 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
if (MapUtils.isNotEmpty(attributes)) { if (MapUtils.isNotEmpty(attributes)) {
ret = new HashMap<>(); ret = new HashMap<>();
for (AtlasStructType.AtlasAttribute attr : structType.getAllAttributes().values()) { // Only process the requested/set attributes
for (Object attribKey : attributes.keySet()) {
AtlasStructType.AtlasAttribute attr = structType.getAttribute((String) attribKey);
AtlasType attrType = attr.getAttributeType(); AtlasType attrType = attr.getAttributeType();
if (attrType == null) { if (attrType == null) {
...@@ -133,16 +136,10 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { ...@@ -133,16 +136,10 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
} }
Object v2Value = attributes.get(attr.getName()); Object v2Value = attributes.get(attr.getName());
Object v1Value = null; Object v1Value;
AtlasFormatConverter attrConverter = null; AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory());
if (attrType.getTypeCategory() == TypeCategory.OBJECT_ID_TYPE && !attr.isOwnedRef()) { v1Value = attrConverter.fromV2ToV1(v2Value, attrType, context);
attrConverter = new AtlasObjectIdConverter(converterRegistry, typeRegistry);
v1Value = attrConverter.fromV2ToV1(v2Value, attrType, context);
} else {
attrConverter = converterRegistry.getConverter(attrType.getTypeCategory());
v1Value = attrConverter.fromV2ToV1(v2Value, attrType, context);
}
ret.put(attr.getName(), v1Value); ret.put(attr.getName(), v1Value);
} }
} }
...@@ -156,7 +153,10 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { ...@@ -156,7 +153,10 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
if (MapUtils.isNotEmpty(attributes)) { if (MapUtils.isNotEmpty(attributes)) {
ret = new HashMap<>(); ret = new HashMap<>();
for (AtlasStructType.AtlasAttribute attr : structType.getAllAttributes().values()) { // Only process the requested/set attributes
for (Object attribKey : attributes.keySet()) {
AtlasStructType.AtlasAttribute attr = structType.getAttribute((String) attribKey);
AtlasType attrType = attr.getAttributeType(); AtlasType attrType = attr.getAttributeType();
if (attrType == null) { if (attrType == null) {
......
...@@ -19,20 +19,13 @@ ...@@ -19,20 +19,13 @@
package org.apache.atlas; package org.apache.atlas;
import org.apache.atlas.metrics.Metrics; import org.apache.atlas.metrics.Metrics;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeSystem;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
public class RequestContextV1 { public class RequestContextV1 {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.inject.Inject;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.resources.EntityResource;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.DateTimeHelper;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.util.Date;
import java.util.List;
/**
* Local atlas client which calls the resource methods directly. Used by NotificationHookConsumer.
*/
public class LocalAtlasClient extends AtlasClient {
private static final String LOCALHOST = "localhost";
private static final String CLASS = LocalAtlasClient.class.getSimpleName();
public static final Logger LOG = LoggerFactory.getLogger(LocalAtlasClient.class);
private final EntityResource entityResource;
private final ServiceState serviceState;
@Inject
public LocalAtlasClient(ServiceState serviceState, EntityResource entityResource) {
super();
this.serviceState = serviceState;
this.entityResource = entityResource;
}
private String user;
public void setUser(String user) {
this.user = user;
}
private void setRequestContext() {
RequestContext requestContext = RequestContext.createContext();
requestContext.setUser(user);
}
@Override
public boolean isServerReady() throws AtlasServiceException {
return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE;
}
@Override
protected List<String> createEntity(final JSONArray entities) throws AtlasServiceException {
LOG.debug("Creating entities: {}", entities);
EntityOperation entityOperation = new EntityOperation(API.CREATE_ENTITY) {
@Override
Response invoke() {
return entityResource.submit(new LocalServletRequest(entities.toString()));
}
};
JSONObject response = entityOperation.run();
EntityResult results = extractEntityResult(response);
LOG.debug("Create entities returned results: {}", results);
return results.getCreatedEntities();
}
@Override
protected EntityResult updateEntities(final JSONArray entities) throws AtlasServiceException {
LOG.debug("Updating entities: {}", entities);
EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY) {
@Override
Response invoke() {
return entityResource.updateEntities(new LocalServletRequest(entities.toString()));
}
};
JSONObject response = entityOperation.run();
EntityResult results = extractEntityResult(response);
LOG.debug("Update entities returned results: {}", results);
return results;
}
private abstract class EntityOperation {
private final API api;
public EntityOperation(API api) {
this.api = api;
}
public JSONObject run() throws AtlasServiceException {
setRequestContext();
AuditFilter.audit(user, CLASS, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
try {
Response response = invoke();
return (JSONObject) response.getEntity();
} catch(WebApplicationException e) {
try {
throw new AtlasServiceException(api, e);
} catch (JSONException e1) {
throw new AtlasServiceException(e);
}
}
}
abstract Response invoke();
}
@Override
public EntityResult updateEntity(final String entityType, final String uniqueAttributeName,
final String uniqueAttributeValue, Referenceable entity) throws AtlasServiceException {
final String entityJson = InstanceSerialization.toJson(entity, true);
LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType,
uniqueAttributeName, uniqueAttributeValue, entityJson);
EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY_PARTIAL) {
@Override
Response invoke() {
return entityResource.updateByUniqueAttribute(entityType, uniqueAttributeName, uniqueAttributeValue,
new LocalServletRequest(entityJson));
}
};
JSONObject response = entityOperation.run();
EntityResult result = extractEntityResult(response);
LOG.debug("Update entity returned result: {}", result);
return result;
}
@Override
public EntityResult deleteEntity(final String entityType, final String uniqueAttributeName,
final String uniqueAttributeValue) throws AtlasServiceException {
LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName,
uniqueAttributeValue);
EntityOperation entityOperation = new EntityOperation(API.DELETE_ENTITY) {
@Override
Response invoke() {
return entityResource.deleteEntities(null, entityType, uniqueAttributeName, uniqueAttributeValue);
}
};
JSONObject response = entityOperation.run();
EntityResult results = extractEntityResult(response);
LOG.debug("Delete entities returned results: {}", results);
return results;
}
@Override
public String getAdminStatus() throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> createType(String typeAsJson) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> updateType(String typeAsJson) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> listTypes() throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public TypesDef getType(String typeName) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public EntityResult updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public EntityResult updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public EntityResult deleteEntities(final String ... guids) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public Referenceable getEntity(String guid) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public Referenceable getEntity(final String entityType, final String attribute, final String value)
throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<String> listEntities(final String entityType) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public List<EntityAuditEvent> getEntityAuditEvents(String entityId, String startKey, short numResults)
throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONArray search(final String searchQuery, final int limit, final int offset) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONArray searchByDSL(final String query, final int limit, final int offset) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONObject searchByFullText(final String query, final int limit, final int offset) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
@Override
public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException {
throw new IllegalStateException("Not supported in LocalAtlasClient");
}
}
...@@ -19,34 +19,50 @@ package org.apache.atlas.notification; ...@@ -19,34 +19,50 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import kafka.consumer.ConsumerTimeoutException; import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.DateTimeHelper;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.atlas.notification.hook.HookNotification.*;
/** /**
* Consumer of notifications from hooks e.g., hive hook etc. * Consumer of notifications from hooks e.g., hive hook etc.
*/ */
@Singleton @Singleton
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
private static final String LOCALHOST = "localhost";
private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
...@@ -57,7 +73,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -57,7 +73,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval"; public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval";
public static final int SERVER_READY_WAIT_TIME_MS = 1000; public static final int SERVER_READY_WAIT_TIME_MS = 1000;
private final LocalAtlasClient atlasClient; private final AtlasEntityStore atlasEntityStore;
private final ServiceState serviceState;
private final AtlasInstanceConverter instanceConverter;
private final AtlasTypeRegistry typeRegistry;
private final int maxRetries; private final int maxRetries;
private final int failedMsgCacheSize; private final int failedMsgCacheSize;
private final int consumerRetryInterval; private final int consumerRetryInterval;
...@@ -68,10 +87,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -68,10 +87,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private List<HookConsumer> consumers; private List<HookConsumer> consumers;
@Inject @Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
throws AtlasException { ServiceState serviceState, AtlasInstanceConverter instanceConverter,
AtlasTypeRegistry typeRegistry) throws AtlasException {
this.notificationInterface = notificationInterface; this.notificationInterface = notificationInterface;
this.atlasClient = atlasClient; this.atlasEntityStore = atlasEntityStore;
this.serviceState = serviceState;
this.instanceConverter = instanceConverter;
this.typeRegistry = typeRegistry;
this.applicationProperties = ApplicationProperties.get(); this.applicationProperties = ApplicationProperties.get();
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
...@@ -208,48 +232,78 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -208,48 +232,78 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} }
@VisibleForTesting @VisibleForTesting
void handleMessage(HookNotification.HookNotificationMessage message) throws void handleMessage(HookNotificationMessage message) throws AtlasServiceException, AtlasException {
AtlasServiceException, AtlasException { String messageUser = message.getUser();
// Used for intermediate conversions during create and update
AtlasEntity.AtlasEntitiesWithExtInfo entities;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) { for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
LOG.debug("Running attempt {}", numRetries); if (LOG.isDebugEnabled()) {
LOG.debug("Running attempt {}", numRetries);
}
try { try {
atlasClient.setUser(message.getUser());
switch (message.getType()) { switch (message.getType()) {
case ENTITY_CREATE: case ENTITY_CREATE:
HookNotification.EntityCreateRequest createRequest = if (LOG.isDebugEnabled()) {
(HookNotification.EntityCreateRequest) message; LOG.debug("EntityCreate via hook");
atlasClient.createEntity(createRequest.getEntities()); }
break; EntityCreateRequest createRequest = (EntityCreateRequest) message;
audit(messageUser, AtlasClient.API.CREATE_ENTITY);
case ENTITY_PARTIAL_UPDATE:
HookNotification.EntityPartialUpdateRequest partialUpdateRequest = entities = instanceConverter.getEntities(createRequest.getEntities());
(HookNotification.EntityPartialUpdateRequest) message;
atlasClient.updateEntity(partialUpdateRequest.getTypeName(), atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
partialUpdateRequest.getAttribute(), break;
partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
break; case ENTITY_PARTIAL_UPDATE:
if (LOG.isDebugEnabled()) {
case ENTITY_DELETE: LOG.debug("EntityPartialUpdate via hook");
HookNotification.EntityDeleteRequest deleteRequest = }
(HookNotification.EntityDeleteRequest) message; final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
atlasClient.deleteEntity(deleteRequest.getTypeName(), audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL);
deleteRequest.getAttribute(),
deleteRequest.getAttributeValue()); Referenceable referenceable = partialUpdateRequest.getEntity();
break; entities = instanceConverter.getEntities(Collections.singletonList(referenceable));
// There should only be one root entity after the conversion
case ENTITY_FULL_UPDATE: AtlasEntity entity = entities.getEntities().get(0);
HookNotification.EntityUpdateRequest updateRequest = // Need to set the attributes explicitly here as the qualified name might have changed during update
(HookNotification.EntityUpdateRequest) message; entity.setAttribute(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
atlasClient.updateEntities(updateRequest.getEntities()); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
break; break;
default: case ENTITY_DELETE:
throw new IllegalStateException("Unhandled exception!"); if (LOG.isDebugEnabled()) {
LOG.debug("EntityDelete via hook");
}
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
audit(messageUser, AtlasClient.API.DELETE_ENTITY);
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
atlasEntityStore.deleteByUniqueAttributes(type,
new HashMap<String, Object>() {{ put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); }});
} catch (ClassCastException cle) {
LOG.error("Failed to do a partial update on Entity");
}
break;
case ENTITY_FULL_UPDATE:
if (LOG.isDebugEnabled()) {
LOG.debug("EntityFullUpdate via hook");
}
EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
audit(messageUser, AtlasClient.API.UPDATE_ENTITY);
entities = instanceConverter.getEntities(updateRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
default:
throw new IllegalStateException("Unhandled exception!");
} }
break; break;
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Error handling message{}", e.getMessage()); LOG.warn("Error handling message: {}", e.getMessage());
try{ try{
LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
Thread.sleep(consumerRetryInterval); Thread.sleep(consumerRetryInterval);
...@@ -272,7 +326,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -272,7 +326,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private void recordFailedMessages() { private void recordFailedMessages() {
//logging failed messages //logging failed messages
for (HookNotification.HookNotificationMessage message : failedMessages) { for (HookNotificationMessage message : failedMessages) {
FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message)); FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message));
} }
failedMessages.clear(); failedMessages.clear();
...@@ -285,7 +339,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -285,7 +339,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
boolean serverAvailable(Timer timer) { boolean serverAvailable(Timer timer) {
try { try {
while (!atlasClient.isServerReady()) { while (serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) {
try { try {
LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...",
SERVER_READY_WAIT_TIME_MS); SERVER_READY_WAIT_TIME_MS);
...@@ -311,4 +365,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -311,4 +365,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumer.close(); consumer.close();
} }
} }
private void audit(String messageUser, AtlasClient.API api) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> audit({},{})", messageUser, api);
}
AuditFilter.audit(messageUser, THREADNAME_PREFIX, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST,
DateTimeHelper.formatDateUTC(new Date()));
}
} }
...@@ -60,7 +60,7 @@ import java.io.ByteArrayInputStream; ...@@ -60,7 +60,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import static org.apache.atlas.web.adapters.AtlasInstanceRestAdapters.toAtlasBaseException; import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException;
/** /**
* Jersey Resource for admin operations. * Jersey Resource for admin operations.
......
...@@ -23,11 +23,12 @@ import org.apache.atlas.exception.AtlasBaseException; ...@@ -23,11 +23,12 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.ClassificationAssociateRequest; import org.apache.atlas.model.instance.ClassificationAssociateRequest;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v1.EntityStream; import org.apache.atlas.repository.store.graph.v1.EntityStream;
...@@ -37,7 +38,6 @@ import org.apache.atlas.type.AtlasEntityType; ...@@ -37,7 +38,6 @@ import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedStruct; import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.web.adapters.AtlasInstanceRestAdapters;
import org.apache.atlas.web.util.Servlets; import org.apache.atlas.web.util.Servlets;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
...@@ -46,7 +46,15 @@ import org.apache.commons.lang3.StringUtils; ...@@ -46,7 +46,15 @@ import org.apache.commons.lang3.StringUtils;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.*; import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -54,7 +62,7 @@ import java.util.HashMap; ...@@ -54,7 +62,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.atlas.web.adapters.AtlasInstanceRestAdapters.toAtlasBaseException; import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException;
/** /**
* REST for a single entity * REST for a single entity
...@@ -66,14 +74,15 @@ public class EntityREST { ...@@ -66,14 +74,15 @@ public class EntityREST {
public static final String PREFIX_ATTR = "attr:"; public static final String PREFIX_ATTR = "attr:";
private final AtlasTypeRegistry typeRegistry; private final AtlasTypeRegistry typeRegistry;
private final AtlasInstanceRestAdapters restAdapters; private final AtlasInstanceConverter restAdapters;
private final MetadataService metadataService; private final MetadataService metadataService;
private final AtlasEntityStore entitiesStore; private final AtlasEntityStore entitiesStore;
@Inject @Inject
public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceRestAdapters restAdapters, MetadataService metadataService, AtlasEntityStore entitiesStore) { public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter,
MetadataService metadataService, AtlasEntityStore entitiesStore) {
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.restAdapters = restAdapters; this.restAdapters = instanceConverter;
this.metadataService = metadataService; this.metadataService = metadataService;
this.entitiesStore = entitiesStore; this.entitiesStore = entitiesStore;
} }
...@@ -437,4 +446,4 @@ public class EntityREST { ...@@ -437,4 +446,4 @@ public class EntityREST {
} }
} }
} }
} }
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import com.google.inject.Inject;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.resources.EntityResource;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONObject;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.atlas.AtlasClient.ENTITIES;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Guice(modules= RepositoryMetadataModule.class)
public class LocalAtlasClientTest {
@Mock
private EntityResource mockEntityResource;
@Inject
private EntityResource entityResource;
@Mock
private ServiceState serviceState;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testCreateEntity() throws Exception {
Response response = mock(Response.class);
when(mockEntityResource.submit(any(HttpServletRequest.class))).thenReturn(response);
final String guid = random();
when(response.getEntity()).thenReturn(new JSONObject() {{
put(ENTITIES, new JSONObject(
new AtlasClient.EntityResult(Arrays.asList(guid), null, null).toString()).get(ENTITIES));
}});
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource);
List<String> results = atlasClient.createEntity(new Referenceable(random()));
assertEquals(results.size(), 1);
assertEquals(results.get(0), guid);
}
@Test
public void testException() throws Exception {
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource);
Response response = mock(Response.class);
when(mockEntityResource.submit(any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response));
when(response.getEntity()).thenReturn(new JSONObject() {{
put("stackTrace", "stackTrace");
}});
when(response.getStatus()).thenReturn(Response.Status.BAD_REQUEST.getStatusCode());
try {
atlasClient.createEntity(new Referenceable(random()));
fail("Expected AtlasServiceException");
} catch(AtlasServiceException e) {
assertEquals(e.getStatus(), ClientResponse.Status.BAD_REQUEST);
}
when(mockEntityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(),
any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response));
when(response.getStatus()).thenReturn(Response.Status.NOT_FOUND.getStatusCode());
try {
atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
fail("Expected AtlasServiceException");
} catch(AtlasServiceException e) {
assertEquals(e.getStatus(), ClientResponse.Status.NOT_FOUND);
}
}
@Test
public void testIsServerReady() throws Exception {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource);
assertTrue(atlasClient.isServerReady());
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE);
assertFalse(atlasClient.isServerReady());
}
@Test
public void testUpdateEntity() throws Exception {
final String guid = random();
Response response = mock(Response.class);
when(mockEntityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(),
any(HttpServletRequest.class))).thenReturn(response);
when(response.getEntity()).thenReturn(new JSONObject() {{
put(ENTITIES, new JSONObject(
new AtlasClient.EntityResult(null, Arrays.asList(guid), null).toString()).get(ENTITIES));
}});
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource);
AtlasClient.EntityResult
entityResult = atlasClient.updateEntity(random(), random(), random(), new Referenceable(random()));
assertEquals(entityResult.getUpdateEntities(), Arrays.asList(guid));
}
@Test
public void testDeleteEntity() throws Exception {
final String guid = random();
Response response = mock(Response.class);
when(response.getEntity()).thenReturn(new JSONObject() {{
put(ENTITIES, new JSONObject(
new AtlasClient.EntityResult(null, null, Arrays.asList(guid)).toString()).get(ENTITIES));
}});
when(mockEntityResource.deleteEntities(anyListOf(String.class), anyString(), anyString(), anyString())).thenReturn(response);
LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource);
AtlasClient.EntityResult entityResult = atlasClient.deleteEntity(random(), random(), random());
assertEquals(entityResult.getDeletedEntities(), Arrays.asList(guid));
}
private String random() {
return RandomStringUtils.randomAlphanumeric(10);
}
@Test
@Inject
public void testGetLocationURI() {
final String guid = "123";
URI uri = entityResource.getLocationURI(new ArrayList<String>() {{ add(guid); }});
uri.getRawPath().equals(AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS + "/" + AtlasClient.API.GET_ENTITY.getPath() + "/" + guid);
}
}
...@@ -22,19 +22,30 @@ import com.google.inject.Inject; ...@@ -22,19 +22,30 @@ import com.google.inject.Inject;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.EntityStream;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert; import org.testng.Assert;
import org.testng.annotations.AfterTest; import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest; import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import static org.mockito.Mockito.mock; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.verify; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
@Guice(modules = NotificationModule.class) @Guice(modules = NotificationModule.class)
public class NotificationHookConsumerKafkaTest { public class NotificationHookConsumerKafkaTest {
...@@ -45,10 +56,28 @@ public class NotificationHookConsumerKafkaTest { ...@@ -45,10 +56,28 @@ public class NotificationHookConsumerKafkaTest {
@Inject @Inject
private NotificationInterface notificationInterface; private NotificationInterface notificationInterface;
@Mock
private AtlasEntityStore atlasEntityStore;
@Mock
private ServiceState serviceState;
@Mock
private AtlasInstanceConverter instanceConverter;
@Mock
private AtlasTypeRegistry typeRegistry;
private KafkaNotification kafkaNotification; private KafkaNotification kafkaNotification;
@BeforeTest @BeforeTest
public void setup() throws AtlasException, InterruptedException { public void setup() throws AtlasException, InterruptedException, AtlasBaseException {
MockitoAnnotations.initMocks(this);
AtlasType mockType = mock(AtlasType.class);
when(typeRegistry.getType(anyString())).thenReturn(mockType);
AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity);
kafkaNotification = startKafkaServer(); kafkaNotification = startKafkaServer();
} }
...@@ -58,25 +87,25 @@ public class NotificationHookConsumerKafkaTest { ...@@ -58,25 +87,25 @@ public class NotificationHookConsumerKafkaTest {
} }
@Test @Test
public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException { public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException {
try { try {
produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification.HookNotificationMessage> consumer = NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
createNewConsumer(kafkaNotification, false); createNewConsumer(kafkaNotification, false);
LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
NotificationHookConsumer notificationHookConsumer = NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(kafkaNotification, localAtlasClient); new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer); notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user1"); verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, and make sure it moves ahead. If commit succeeded, this would work. // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user2"); verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
reset(atlasEntityStore);
} }
finally { finally {
kafkaNotification.close(); kafkaNotification.close();
...@@ -90,20 +119,19 @@ public class NotificationHookConsumerKafkaTest { ...@@ -90,20 +119,19 @@ public class NotificationHookConsumerKafkaTest {
NotificationConsumer<HookNotification.HookNotificationMessage> consumer = NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
createNewConsumer(kafkaNotification, true); createNewConsumer(kafkaNotification, true);
LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
NotificationHookConsumer notificationHookConsumer = NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(kafkaNotification, localAtlasClient); new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer); notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user3"); verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean());
// produce another message, but this will not be consumed, as commit code is not executed in hook consumer. // produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity())); produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer);
verify(localAtlasClient).setUser("test_user3"); verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean());
} }
finally { finally {
kafkaNotification.close(); kafkaNotification.close();
......
...@@ -17,13 +17,20 @@ ...@@ -17,13 +17,20 @@
*/ */
package org.apache.atlas.notification; package org.apache.atlas.notification;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.EntityStream;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
...@@ -31,16 +38,11 @@ import org.testng.annotations.BeforeMethod; ...@@ -31,16 +38,11 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import static org.mockito.Mockito.any; import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.assertTrue;
...@@ -50,26 +52,41 @@ public class NotificationHookConsumerTest { ...@@ -50,26 +52,41 @@ public class NotificationHookConsumerTest {
private NotificationInterface notificationInterface; private NotificationInterface notificationInterface;
@Mock @Mock
private LocalAtlasClient atlasClient;
@Mock
private Configuration configuration; private Configuration configuration;
@Mock @Mock
private ExecutorService executorService; private ExecutorService executorService;
@Mock
private AtlasEntityStore atlasEntityStore;
@Mock
private ServiceState serviceState;
@Mock
private AtlasInstanceConverter instanceConverter;
@Mock
private AtlasTypeRegistry typeRegistry;
@BeforeMethod @BeforeMethod
public void setup() { public void setup() throws AtlasBaseException {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
AtlasType mockType = mock(AtlasType.class);
when(typeRegistry.getType(anyString())).thenReturn(mockType);
AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity);
EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class);
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse);
} }
@Test @Test
public void testConsumerCanProceedIfServerIsReady() throws Exception { public void testConsumerCanProceedIfServerIsReady() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(true); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
assertTrue(hookConsumer.serverAvailable(timer)); assertTrue(hookConsumer.serverAvailable(timer));
...@@ -78,11 +95,16 @@ public class NotificationHookConsumerTest { ...@@ -78,11 +95,16 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenReturn(false, false, false, true);
when(serviceState.getState())
.thenReturn(ServiceState.ServiceStateValue.PASSIVE)
.thenReturn(ServiceState.ServiceStateValue.PASSIVE)
.thenReturn(ServiceState.ServiceStateValue.PASSIVE)
.thenReturn(ServiceState.ServiceStateValue.ACTIVE);
assertTrue(hookConsumer.serverAvailable(timer)); assertTrue(hookConsumer.serverAvailable(timer));
...@@ -92,13 +114,15 @@ public class NotificationHookConsumerTest { ...@@ -92,13 +114,15 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
NotificationHookConsumer notificationHookConsumer = NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasClient); new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer); notificationHookConsumer.new HookConsumer(consumer);
HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class); HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
when(message.getUser()).thenReturn("user"); when(message.getUser()).thenReturn("user");
when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE); when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
Referenceable mock = mock(Referenceable.class);
when(message.getEntities()).thenReturn(Arrays.asList(mock));
hookConsumer.handleMessage(message); hookConsumer.handleMessage(message);
...@@ -106,15 +130,17 @@ public class NotificationHookConsumerTest { ...@@ -106,15 +130,17 @@ public class NotificationHookConsumerTest {
} }
@Test @Test
public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException { public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
NotificationHookConsumer notificationHookConsumer = NotificationHookConsumer notificationHookConsumer =
new NotificationHookConsumer(notificationInterface, atlasClient); new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(consumer); notificationHookConsumer.new HookConsumer(consumer);
HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", new ArrayList<Referenceable>()); HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user",
when(atlasClient.createEntity(any(List.class))). new ArrayList<Referenceable>() {
thenThrow(new RuntimeException("Simulating exception in processing message")); { add(mock(Referenceable.class)); }
});
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message"));
hookConsumer.handleMessage(message); hookConsumer.handleMessage(message);
verifyZeroInteractions(consumer); verifyZeroInteractions(consumer);
...@@ -122,24 +148,12 @@ public class NotificationHookConsumerTest { ...@@ -122,24 +148,12 @@ public class NotificationHookConsumerTest {
@Test @Test
public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS); doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
when(atlasClient.isServerReady()).thenReturn(false); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
assertFalse(hookConsumer.serverAvailable(timer));
}
@Test
public void testConsumerProceedsWithFalseOnAtlasServiceException() throws Exception {
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient);
NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
new Exception()));
assertFalse(hookConsumer.serverAvailable(timer)); assertFalse(hookConsumer.serverAvailable(timer));
} }
...@@ -152,7 +166,7 @@ public class NotificationHookConsumerTest { ...@@ -152,7 +166,7 @@ public class NotificationHookConsumerTest {
consumers.add(mock(NotificationConsumer.class)); consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers); thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
...@@ -167,7 +181,7 @@ public class NotificationHookConsumerTest { ...@@ -167,7 +181,7 @@ public class NotificationHookConsumerTest {
consumers.add(mock(NotificationConsumer.class)); consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers); thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
verifyZeroInteractions(notificationInterface); verifyZeroInteractions(notificationInterface);
} }
...@@ -181,7 +195,7 @@ public class NotificationHookConsumerTest { ...@@ -181,7 +195,7 @@ public class NotificationHookConsumerTest {
consumers.add(mock(NotificationConsumer.class)); consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers); thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsActive(); notificationHookConsumer.instanceIsActive();
verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1);
...@@ -196,7 +210,7 @@ public class NotificationHookConsumerTest { ...@@ -196,7 +210,7 @@ public class NotificationHookConsumerTest {
consumers.add(mock(NotificationConsumer.class)); consumers.add(mock(NotificationConsumer.class));
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).
thenReturn(consumers); thenReturn(consumers);
NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive(); notificationHookConsumer.instanceIsPassive();
verify(notificationInterface).close(); verify(notificationInterface).close();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment