diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index ae6be84..584bf25 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -75,9 +75,9 @@ public enum AtlasErrorCode { INSTANCE_LINEAGE_QUERY_FAILED(404, "ATLAS4047E", "Instance lineage query failed {0}"), DISCOVERY_QUERY_FAILED(404, "ATLAS4048E", "Discovery query failed {0}"), INSTANCE_CRUD_INVALID_PARAMS(404, "ATLAS4049E", "Invalid instance creation/updation parameters passed : {0}"), + INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND(404, "ATLAS40410E", "Instance {0} with unique attribute {1} does not exist"), REFERENCED_ENTITY_NOT_FOUND(404, "ATLAS40411E", "Referenced entity {0} is not found"), - // All data conflict errors go here TYPE_ALREADY_EXISTS(409, "ATLAS4091E", "Given type {0} already exists"), TYPE_HAS_REFERENCES(409, "ATLAS4092E", "Given type {0} has references"), @@ -87,7 +87,8 @@ public enum AtlasErrorCode { INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"), INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"), INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"), - FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"); + FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"), + NOTIFICATION_FAILED(500, "ATLAS5005E", "Failed to notify for change {0}"); private String errorCode; private String errorMessage; diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java index edaede0..e74813a 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java @@ -69,7 +69,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable { private String updatedBy = null; private Date createTime = null; private Date updateTime = null; - private Long version = new Long(0); + private Long version = 0L; private List<AtlasClassification> classifications; diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java index 5e8ce35..7078436 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java @@ -45,8 +45,8 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @XmlAccessorType(XmlAccessType.PROPERTY) public class EntityMutationResponse { - Map<EntityOperation, List<AtlasEntityHeader>> mutatedEntities; - Map<String, String> guidAssignments; + private Map<EntityOperation, List<AtlasEntityHeader>> mutatedEntities; + private Map<String, String> guidAssignments; public EntityMutationResponse() { } diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 1ef803c..e4dcfca 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -18,7 +18,6 @@ package org.apache.atlas.repository.audit; -import com.google.inject.Inject; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.EntityAuditEvent.EntityAuditAction; @@ -34,6 +33,7 @@ import org.apache.commons.collections.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -109,6 +109,10 @@ public class EntityAuditListener implements EntityChangeListener { auditRepository.putEvents(events); } + public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{ + return auditRepository.listEvents(guid, null, (short) 10); + } + private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, EntityAuditAction action) throws AtlasException { String detail = getAuditEventDetail(entity, action); @@ -189,29 +193,11 @@ public class EntityAuditListener implements EntityChangeListener { if (attrValue instanceof Collection) { for (Object attribute : (Collection) attrValue) { if (attribute instanceof ITypedReferenceableInstance) { - ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attribute; - Map<String, Object> prunedAttrs = pruneEntityAttributesForAudit(attrInstance); - - if (MapUtils.isNotEmpty(prunedAttrs)) { - if (ret == null) { - ret = new HashMap<>(); - } - - ret.put(attrInstance.getId()._getId(), prunedAttrs); - } + ret = pruneAttributes(ret, (ITypedReferenceableInstance) attribute); } } } else if (attrValue instanceof ITypedReferenceableInstance) { - ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attrValue; - Map<String, Object> prunedAttrs = pruneEntityAttributesForAudit(attrInstance); - - if (MapUtils.isNotEmpty(prunedAttrs)) { - if (ret == null) { - ret = new HashMap<>(); - } - - ret.put(attrInstance.getId()._getId(), prunedAttrs); - } + ret = pruneAttributes(ret, (ITypedReferenceableInstance) attrValue); } } } @@ -220,6 +206,20 @@ public class EntityAuditListener implements EntityChangeListener { return ret; } + private Map<String, Object> pruneAttributes(Map<String, Object> ret, ITypedReferenceableInstance attribute) throws AtlasException { + ITypedReferenceableInstance attrInstance = attribute; + Map<String, Object> prunedAttrs = pruneEntityAttributesForAudit(attrInstance); + + if (MapUtils.isNotEmpty(prunedAttrs)) { + if (ret == null) { + ret = new HashMap<>(); + } + + ret.put(attrInstance.getId()._getId(), prunedAttrs); + } + return ret; + } + private void restoreEntityAttributes(ITypedReferenceableInstance entity, Map<String, Object> prunedAttributes) throws AtlasException { if (MapUtils.isEmpty(prunedAttributes)) { return; @@ -240,27 +240,25 @@ public class EntityAuditListener implements EntityChangeListener { if (attrValue instanceof Collection) { for (Object attributeEntity : (Collection) attrValue) { if (attributeEntity instanceof ITypedReferenceableInstance) { - ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attributeEntity; - Object obj = prunedAttributes.get(attrInstance.getId()._getId()); - - if (obj instanceof Map) { - restoreEntityAttributes(attrInstance, (Map) obj); - } + restoreAttributes(prunedAttributes, (ITypedReferenceableInstance) attributeEntity); } } } else if (attrValue instanceof ITypedReferenceableInstance) { - ITypedReferenceableInstance attrInstance = (ITypedReferenceableInstance) attrValue; - Object obj = prunedAttributes.get(attrInstance.getId()._getId()); - - if (obj instanceof Map) { - restoreEntityAttributes(attrInstance, (Map) obj); - } + restoreAttributes(prunedAttributes, (ITypedReferenceableInstance) attrValue); } } } } } + private void restoreAttributes(Map<String, Object> prunedAttributes, ITypedReferenceableInstance attributeEntity) throws AtlasException { + Object obj = prunedAttributes.get(attributeEntity.getId()._getId()); + + if (obj instanceof Map) { + restoreEntityAttributes(attributeEntity, (Map) obj); + } + } + private String getAuditPrefix(EntityAuditAction action) { final String ret; diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasAbstractFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasAbstractFormatConverter.java similarity index 97% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasAbstractFormatConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasAbstractFormatConverter.java index f1f3d18..a36618c 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasAbstractFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasAbstractFormatConverter.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.model.TypeCategory; diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasArrayFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasArrayFormatConverter.java similarity index 98% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasArrayFormatConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasArrayFormatConverter.java index aa14aff..9e8f523 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasArrayFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasArrayFormatConverter.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.AtlasErrorCode; diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasClassificationFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasClassificationFormatConverter.java similarity index 98% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasClassificationFormatConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasClassificationFormatConverter.java index dc740f5..cd4f165 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasClassificationFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasClassificationFormatConverter.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasEntityFormatConverter.java similarity index 88% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasEntityFormatConverter.java index cb1390d..1ce6168 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEntityFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasEntityFormatConverter.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; @@ -51,22 +51,18 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { } @Override - public Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext context) throws AtlasBaseException { - AtlasObjectId ret = null; + public AtlasEntity fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext context) throws AtlasBaseException { + AtlasEntity entity = null; if (v1Obj != null) { AtlasEntityType entityType = (AtlasEntityType) type; - if (v1Obj instanceof Id) { - Id id = (Id) v1Obj; - - ret = new AtlasObjectId(id._getId(), id.getTypeName()); - } else if (v1Obj instanceof IReferenceableInstance) { + if (v1Obj instanceof IReferenceableInstance) { IReferenceableInstance entRef = (IReferenceableInstance) v1Obj; - ret = new AtlasObjectId(entRef.getId()._getId(), entRef.getTypeName()); + String guid = entRef.getId()._getId(); - if (!context.entityExists(ret.getGuid())) { + if (!context.entityExists(guid)) { Map<String, Object> v1Attribs = null; try { @@ -75,7 +71,7 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { LOG.error("IReferenceableInstance.getValuesMap() failed", excp); } - AtlasEntity entity = new AtlasEntity(entRef.getTypeName(), + entity = new AtlasEntity(entRef.getTypeName(), super.fromV1ToV2(entityType, v1Attribs, context)); entity.setGuid(entRef.getId()._getId()); entity.setStatus(convertState(entRef.getId().getState())); @@ -83,7 +79,7 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { entity.setCreateTime(entRef.getSystemAttributes().createdTime); entity.setUpdatedBy(entRef.getSystemAttributes().modifiedBy); entity.setUpdateTime(entRef.getSystemAttributes().modifiedTime); - entity.setVersion(new Long(entRef.getId().version)); + entity.setVersion((long) entRef.getId().version); if (CollectionUtils.isNotEmpty(entRef.getTraits())) { List<AtlasClassification> classifications = new ArrayList<>(); @@ -99,18 +95,18 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { entity.setClassifications(classifications); } - - context.addEntity(entity); + } else { + entity = context.getById(guid); } } else { throw new AtlasBaseException(AtlasErrorCode.UNEXPECTED_TYPE, "IReferenceableInstance", v1Obj.getClass().getCanonicalName()); } } - return ret; + return entity; } - private AtlasEntity.Status convertState(EntityState state){ + private Status convertState(EntityState state){ Status status = Status.ACTIVE; if(state != null && state.equals(EntityState.DELETED)){ status = Status.DELETED; @@ -160,7 +156,6 @@ public class AtlasEntityFormatConverter extends AtlasStructFormatConverter { v2Obj.getClass().getCanonicalName()); } } - return ret; } } diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEnumFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasEnumFormatConverter.java similarity index 97% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEnumFormatConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasEnumFormatConverter.java index 6d8e3ae..2bf15f2 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasEnumFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasEnumFormatConverter.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.exception.AtlasBaseException; diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasFormatConverter.java similarity index 74% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasFormatConverter.java index a7157a3..9d0d7f4 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasFormatConverter.java @@ -15,16 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.type.AtlasType; -import java.util.HashMap; -import java.util.Map; public interface AtlasFormatConverter { Object fromV1ToV2(Object v1Obj, AtlasType type, ConverterContext context) throws AtlasBaseException; @@ -33,28 +32,35 @@ public interface AtlasFormatConverter { TypeCategory getTypeCategory(); - public static class ConverterContext { + class ConverterContext { - private Map<String, AtlasEntity> entities = null; + private AtlasEntity.AtlasEntitiesWithExtInfo entities = null; public void addEntity(AtlasEntity entity) { if (entities == null) { - entities = new HashMap<>(); + entities = new AtlasEntitiesWithExtInfo(); } - entities.put(entity.getGuid(), entity); + entities.addEntity(entity); + } + + public void addReferredEntity(AtlasEntity entity) { + if (entities == null) { + entities = new AtlasEntitiesWithExtInfo(); + } + entities.addReferredEntity(entity); } public AtlasEntity getById(String guid) { if( entities != null) { - return entities.get(guid); + return entities.getEntity(guid); } return null; } - public boolean entityExists(String guid) { return entities != null && entities.containsKey(guid); } + public boolean entityExists(String guid) { return entities != null && entities.hasEntity(guid); } - public Map<String, AtlasEntity> getEntities() { + public AtlasEntitiesWithExtInfo getEntities() { return entities; } } diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverters.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasFormatConverters.java similarity index 97% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverters.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasFormatConverters.java index 968d74f..3a164c8 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasFormatConverters.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasFormatConverters.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -41,6 +41,7 @@ public class AtlasFormatConverters { registerConverter(new AtlasEntityFormatConverter(this, typeRegistry)); registerConverter(new AtlasArrayFormatConverter(this, typeRegistry)); registerConverter(new AtlasMapFormatConverter(this, typeRegistry)); + registerConverter(new AtlasObjectIdConverter(this, typeRegistry)); } private void registerConverter(AtlasFormatConverter converter) { diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java similarity index 85% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java index b1dae56..95dcc7a 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasInstanceRestAdapters.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java @@ -15,8 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; +import com.google.inject.Inject; +import com.google.inject.Singleton; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; @@ -48,14 +50,12 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Iterator; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import java.util.Map; +import java.util.List; @Singleton -public class AtlasInstanceRestAdapters { +public class AtlasInstanceConverter { - private static final Logger LOG = LoggerFactory.getLogger(AtlasInstanceRestAdapters.class); + private static final Logger LOG = LoggerFactory.getLogger(AtlasInstanceConverter.class); @Inject private AtlasTypeRegistry typeRegistry; @@ -125,9 +125,9 @@ public class AtlasInstanceRestAdapters { return ret; } - public Map<String, AtlasEntity> getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException { + public AtlasEntity.AtlasEntitiesWithExtInfo getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException { - AtlasFormatConverter converter = instanceFormatters.getConverter(TypeCategory.ENTITY); + AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName()); if (entityType == null) { @@ -136,7 +136,8 @@ public class AtlasInstanceRestAdapters { AtlasFormatConverter.ConverterContext ctx = new AtlasFormatConverter.ConverterContext(); - converter.fromV1ToV2(referenceable, entityType, ctx); + AtlasEntity entity = converter.fromV1ToV2(referenceable, entityType, ctx); + ctx.addEntity(entity); return ctx.getEntities(); } @@ -186,4 +187,37 @@ public class AtlasInstanceRestAdapters { return new AtlasBaseException(e); } + public AtlasEntity.AtlasEntitiesWithExtInfo getEntities(List<Referenceable> referenceables) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getEntities"); + } + + AtlasFormatConverter.ConverterContext context = new AtlasFormatConverter.ConverterContext(); + for (Referenceable referenceable : referenceables) { + AtlasEntity entity = fromV1toV2Entity(referenceable, context); + + context.addEntity(entity); + } + if (LOG.isDebugEnabled()) { + LOG.debug("<== getEntities"); + } + + return context.getEntities(); + } + + private AtlasEntity fromV1toV2Entity(Referenceable referenceable, AtlasFormatConverter.ConverterContext context) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> fromV1toV2Entity"); + } + + AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY); + + AtlasEntity entity = converter.fromV1ToV2(referenceable, typeRegistry.getType(referenceable.getTypeName()), context); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== fromV1toV2Entity"); + } + return entity; + } + } diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasMapFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasMapFormatConverter.java similarity index 95% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasMapFormatConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasMapFormatConverter.java index 6967c4f..bdfbf39 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasMapFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasMapFormatConverter.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.AtlasErrorCode; @@ -77,12 +77,12 @@ public class AtlasMapFormatConverter extends AtlasAbstractFormatConverter { AtlasType valueType = mapType.getValueType(); AtlasFormatConverter keyConverter = converterRegistry.getConverter(keyType.getTypeCategory()); AtlasFormatConverter valueConverter = converterRegistry.getConverter(valueType.getTypeCategory()); - Map v1Map = (Map)v2Obj; + Map v2Map = (Map)v2Obj; ret = new HashMap<>(); - for (Object key : v1Map.keySet()) { - Object value = v1Map.get(key); + for (Object key : v2Map.keySet()) { + Object value = v2Map.get(key); Object v2Key = keyConverter.fromV2ToV1(key, keyType, ctx); Object v2Value = valueConverter.fromV2ToV1(value, valueType, ctx); diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasObjectIdConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasObjectIdConverter.java similarity index 89% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasObjectIdConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasObjectIdConverter.java index 11a020d..a5ab8d7 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasObjectIdConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasObjectIdConverter.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.AtlasErrorCode; @@ -23,12 +23,11 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.persistence.Id; -import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -53,8 +52,19 @@ AtlasObjectIdConverter extends AtlasAbstractFormatConverter { Id id = (Id) v1Obj; ret = new AtlasObjectId(id._getId(), id.getTypeName()); } else if (v1Obj instanceof IReferenceableInstance) { - IReferenceableInstance entity = (IReferenceableInstance) v1Obj; - ret = new AtlasObjectId(entity.getId()._getId(), entity.getTypeName()); + IReferenceableInstance refInst = (IReferenceableInstance) v1Obj; + + String guid = refInst.getId()._getId(); + ret = new AtlasObjectId(guid, refInst.getTypeName()); + + if (!converterContext.entityExists(guid)) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(refInst.getTypeName()); + AtlasEntityFormatConverter entityFormatConverter = (AtlasEntityFormatConverter) converterRegistry.getConverter(TypeCategory.ENTITY); + + AtlasEntity entity = entityFormatConverter.fromV1ToV2(v1Obj, entityType, converterContext); + + converterContext.addReferredEntity(entity); + } } } return ret; diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasPrimitiveFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasPrimitiveFormatConverter.java similarity index 97% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasPrimitiveFormatConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasPrimitiveFormatConverter.java index 2b70c5e..d0e63eb 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasPrimitiveFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasPrimitiveFormatConverter.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.exception.AtlasBaseException; diff --git a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java similarity index 86% rename from webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java rename to repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java index 920b48b..90f3e5b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/adapters/AtlasStructFormatConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java @@ -15,16 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.atlas.web.adapters; +package org.apache.atlas.repository.converters; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasStruct; -import org.apache.atlas.model.typedef.AtlasStructDef; -import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; -import org.apache.atlas.type.*; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.Struct; import org.apache.commons.collections.MapUtils; @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { private static final Logger LOG = LoggerFactory.getLogger(AtlasStructFormatConverter.class); @@ -124,7 +125,9 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { if (MapUtils.isNotEmpty(attributes)) { ret = new HashMap<>(); - for (AtlasStructType.AtlasAttribute attr : structType.getAllAttributes().values()) { + // Only process the requested/set attributes + for (Object attribKey : attributes.keySet()) { + AtlasStructType.AtlasAttribute attr = structType.getAttribute((String) attribKey); AtlasType attrType = attr.getAttributeType(); if (attrType == null) { @@ -133,16 +136,10 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { } Object v2Value = attributes.get(attr.getName()); - Object v1Value = null; + Object v1Value; - AtlasFormatConverter attrConverter = null; - if (attrType.getTypeCategory() == TypeCategory.OBJECT_ID_TYPE && !attr.isOwnedRef()) { - attrConverter = new AtlasObjectIdConverter(converterRegistry, typeRegistry); - v1Value = attrConverter.fromV2ToV1(v2Value, attrType, context); - } else { - attrConverter = converterRegistry.getConverter(attrType.getTypeCategory()); - v1Value = attrConverter.fromV2ToV1(v2Value, attrType, context); - } + AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory()); + v1Value = attrConverter.fromV2ToV1(v2Value, attrType, context); ret.put(attr.getName(), v1Value); } } @@ -156,7 +153,10 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter { if (MapUtils.isNotEmpty(attributes)) { ret = new HashMap<>(); - for (AtlasStructType.AtlasAttribute attr : structType.getAllAttributes().values()) { + // Only process the requested/set attributes + for (Object attribKey : attributes.keySet()) { + AtlasStructType.AtlasAttribute attr = structType.getAttribute((String) attribKey); + AtlasType attrType = attr.getAttributeType(); if (attrType == null) { diff --git a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java index 59adb00..08aa960 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContextV1.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContextV1.java @@ -19,20 +19,13 @@ package org.apache.atlas; import org.apache.atlas.metrics.Metrics; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.typesystem.ITypedReferenceableInstance; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.TypeSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; -import java.util.List; import java.util.Set; public class RequestContextV1 { diff --git a/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java b/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java deleted file mode 100644 index 2b71489..0000000 --- a/webapp/src/main/java/org/apache/atlas/LocalAtlasClient.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas; - -import com.google.inject.Inject; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.TypesDef; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.apache.atlas.web.filters.AuditFilter; -import org.apache.atlas.web.resources.EntityResource; -import org.apache.atlas.web.service.ServiceState; -import org.apache.atlas.web.util.DateTimeHelper; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.util.Date; -import java.util.List; - -/** - * Local atlas client which calls the resource methods directly. Used by NotificationHookConsumer. - */ -public class LocalAtlasClient extends AtlasClient { - private static final String LOCALHOST = "localhost"; - private static final String CLASS = LocalAtlasClient.class.getSimpleName(); - - public static final Logger LOG = LoggerFactory.getLogger(LocalAtlasClient.class); - - private final EntityResource entityResource; - - private final ServiceState serviceState; - - @Inject - public LocalAtlasClient(ServiceState serviceState, EntityResource entityResource) { - super(); - this.serviceState = serviceState; - this.entityResource = entityResource; - } - - private String user; - - public void setUser(String user) { - this.user = user; - } - - private void setRequestContext() { - RequestContext requestContext = RequestContext.createContext(); - requestContext.setUser(user); - } - - @Override - public boolean isServerReady() throws AtlasServiceException { - return serviceState.getState() == ServiceState.ServiceStateValue.ACTIVE; - } - - @Override - protected List<String> createEntity(final JSONArray entities) throws AtlasServiceException { - LOG.debug("Creating entities: {}", entities); - EntityOperation entityOperation = new EntityOperation(API.CREATE_ENTITY) { - @Override - Response invoke() { - return entityResource.submit(new LocalServletRequest(entities.toString())); - } - }; - JSONObject response = entityOperation.run(); - EntityResult results = extractEntityResult(response); - LOG.debug("Create entities returned results: {}", results); - return results.getCreatedEntities(); - } - - @Override - protected EntityResult updateEntities(final JSONArray entities) throws AtlasServiceException { - LOG.debug("Updating entities: {}", entities); - EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY) { - @Override - Response invoke() { - return entityResource.updateEntities(new LocalServletRequest(entities.toString())); - } - }; - JSONObject response = entityOperation.run(); - EntityResult results = extractEntityResult(response); - LOG.debug("Update entities returned results: {}", results); - return results; - } - - private abstract class EntityOperation { - private final API api; - - public EntityOperation(API api) { - this.api = api; - } - - public JSONObject run() throws AtlasServiceException { - setRequestContext(); - AuditFilter.audit(user, CLASS, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST, DateTimeHelper.formatDateUTC(new Date())); - - try { - Response response = invoke(); - return (JSONObject) response.getEntity(); - } catch(WebApplicationException e) { - try { - throw new AtlasServiceException(api, e); - } catch (JSONException e1) { - throw new AtlasServiceException(e); - } - } - } - - abstract Response invoke(); - } - - @Override - public EntityResult updateEntity(final String entityType, final String uniqueAttributeName, - final String uniqueAttributeValue, Referenceable entity) throws AtlasServiceException { - final String entityJson = InstanceSerialization.toJson(entity, true); - LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType, - uniqueAttributeName, uniqueAttributeValue, entityJson); - EntityOperation entityOperation = new EntityOperation(API.UPDATE_ENTITY_PARTIAL) { - @Override - Response invoke() { - return entityResource.updateByUniqueAttribute(entityType, uniqueAttributeName, uniqueAttributeValue, - new LocalServletRequest(entityJson)); - } - }; - JSONObject response = entityOperation.run(); - EntityResult result = extractEntityResult(response); - LOG.debug("Update entity returned result: {}", result); - return result; - } - - @Override - public EntityResult deleteEntity(final String entityType, final String uniqueAttributeName, - final String uniqueAttributeValue) throws AtlasServiceException { - LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName, - uniqueAttributeValue); - EntityOperation entityOperation = new EntityOperation(API.DELETE_ENTITY) { - @Override - Response invoke() { - return entityResource.deleteEntities(null, entityType, uniqueAttributeName, uniqueAttributeValue); - } - }; - JSONObject response = entityOperation.run(); - EntityResult results = extractEntityResult(response); - LOG.debug("Delete entities returned results: {}", results); - return results; - } - - @Override - public String getAdminStatus() throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public List<String> createType(String typeAsJson) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public List<String> updateType(String typeAsJson) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public List<String> listTypes() throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public TypesDef getType(String typeName) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public EntityResult updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public EntityResult updateEntity(String guid, Referenceable entity) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - - @Override - public EntityResult deleteEntities(final String ... guids) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public Referenceable getEntity(String guid) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public Referenceable getEntity(final String entityType, final String attribute, final String value) - throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public List<String> listEntities(final String entityType) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public List<EntityAuditEvent> getEntityAuditEvents(String entityId, String startKey, short numResults) - throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public JSONArray search(final String searchQuery, final int limit, final int offset) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public JSONArray searchByDSL(final String query, final int limit, final int offset) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public JSONObject searchByFullText(final String query, final int limit, final int offset) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public JSONObject getInputGraph(String datasetName) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } - - @Override - public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException { - throw new IllegalStateException("Not supported in LocalAtlasClient"); - } -} diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index f241681..891d7ac 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -19,34 +19,50 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.Inject; import com.google.inject.Singleton; import kafka.consumer.ConsumerTimeoutException; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; import org.apache.atlas.service.Service; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.web.filters.AuditFilter; +import org.apache.atlas.web.service.ServiceState; +import org.apache.atlas.web.util.DateTimeHelper; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.atlas.notification.hook.HookNotification.*; + /** * Consumer of notifications from hooks e.g., hive hook etc. */ @Singleton public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); + private static final String LOCALHOST = "localhost"; private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); @@ -57,7 +73,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; - private final LocalAtlasClient atlasClient; + private final AtlasEntityStore atlasEntityStore; + private final ServiceState serviceState; + private final AtlasInstanceConverter instanceConverter; + private final AtlasTypeRegistry typeRegistry; private final int maxRetries; private final int failedMsgCacheSize; private final int consumerRetryInterval; @@ -68,10 +87,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private List<HookConsumer> consumers; @Inject - public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) - throws AtlasException { + public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, + ServiceState serviceState, AtlasInstanceConverter instanceConverter, + AtlasTypeRegistry typeRegistry) throws AtlasException { this.notificationInterface = notificationInterface; - this.atlasClient = atlasClient; + this.atlasEntityStore = atlasEntityStore; + this.serviceState = serviceState; + this.instanceConverter = instanceConverter; + this.typeRegistry = typeRegistry; + this.applicationProperties = ApplicationProperties.get(); maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); @@ -208,48 +232,78 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } @VisibleForTesting - void handleMessage(HookNotification.HookNotificationMessage message) throws - AtlasServiceException, AtlasException { + void handleMessage(HookNotificationMessage message) throws AtlasServiceException, AtlasException { + String messageUser = message.getUser(); + // Used for intermediate conversions during create and update + AtlasEntity.AtlasEntitiesWithExtInfo entities; for (int numRetries = 0; numRetries < maxRetries; numRetries++) { - LOG.debug("Running attempt {}", numRetries); + if (LOG.isDebugEnabled()) { + LOG.debug("Running attempt {}", numRetries); + } try { - atlasClient.setUser(message.getUser()); switch (message.getType()) { - case ENTITY_CREATE: - HookNotification.EntityCreateRequest createRequest = - (HookNotification.EntityCreateRequest) message; - atlasClient.createEntity(createRequest.getEntities()); - break; - - case ENTITY_PARTIAL_UPDATE: - HookNotification.EntityPartialUpdateRequest partialUpdateRequest = - (HookNotification.EntityPartialUpdateRequest) message; - atlasClient.updateEntity(partialUpdateRequest.getTypeName(), - partialUpdateRequest.getAttribute(), - partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity()); - break; - - case ENTITY_DELETE: - HookNotification.EntityDeleteRequest deleteRequest = - (HookNotification.EntityDeleteRequest) message; - atlasClient.deleteEntity(deleteRequest.getTypeName(), - deleteRequest.getAttribute(), - deleteRequest.getAttributeValue()); - break; - - case ENTITY_FULL_UPDATE: - HookNotification.EntityUpdateRequest updateRequest = - (HookNotification.EntityUpdateRequest) message; - atlasClient.updateEntities(updateRequest.getEntities()); - break; - - default: - throw new IllegalStateException("Unhandled exception!"); + case ENTITY_CREATE: + if (LOG.isDebugEnabled()) { + LOG.debug("EntityCreate via hook"); + } + EntityCreateRequest createRequest = (EntityCreateRequest) message; + audit(messageUser, AtlasClient.API.CREATE_ENTITY); + + entities = instanceConverter.getEntities(createRequest.getEntities()); + + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); + break; + + case ENTITY_PARTIAL_UPDATE: + if (LOG.isDebugEnabled()) { + LOG.debug("EntityPartialUpdate via hook"); + } + final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message; + audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL); + + Referenceable referenceable = partialUpdateRequest.getEntity(); + entities = instanceConverter.getEntities(Collections.singletonList(referenceable)); + // There should only be one root entity after the conversion + AtlasEntity entity = entities.getEntities().get(0); + // Need to set the attributes explicitly here as the qualified name might have changed during update + entity.setAttribute(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true); + break; + + case ENTITY_DELETE: + if (LOG.isDebugEnabled()) { + LOG.debug("EntityDelete via hook"); + } + final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message; + audit(messageUser, AtlasClient.API.DELETE_ENTITY); + + try { + AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); + atlasEntityStore.deleteByUniqueAttributes(type, + new HashMap<String, Object>() {{ put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); }}); + } catch (ClassCastException cle) { + LOG.error("Failed to do a partial update on Entity"); + } + break; + + case ENTITY_FULL_UPDATE: + if (LOG.isDebugEnabled()) { + LOG.debug("EntityFullUpdate via hook"); + } + EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; + audit(messageUser, AtlasClient.API.UPDATE_ENTITY); + + entities = instanceConverter.getEntities(updateRequest.getEntities()); + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); + break; + + default: + throw new IllegalStateException("Unhandled exception!"); } break; } catch (Throwable e) { - LOG.warn("Error handling message{}", e.getMessage()); + LOG.warn("Error handling message: {}", e.getMessage()); try{ LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); Thread.sleep(consumerRetryInterval); @@ -272,7 +326,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void recordFailedMessages() { //logging failed messages - for (HookNotification.HookNotificationMessage message : failedMessages) { + for (HookNotificationMessage message : failedMessages) { FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message)); } failedMessages.clear(); @@ -285,7 +339,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl boolean serverAvailable(Timer timer) { try { - while (!atlasClient.isServerReady()) { + while (serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) { try { LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", SERVER_READY_WAIT_TIME_MS); @@ -311,4 +365,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl consumer.close(); } } + + private void audit(String messageUser, AtlasClient.API api) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> audit({},{})", messageUser, api); + } + + AuditFilter.audit(messageUser, THREADNAME_PREFIX, api.getMethod(), LOCALHOST, api.getPath(), LOCALHOST, + DateTimeHelper.formatDateUTC(new Date())); + } } diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 2c2c16d..c8c0099 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -60,7 +60,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.*; -import static org.apache.atlas.web.adapters.AtlasInstanceRestAdapters.toAtlasBaseException; +import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException; /** * Jersey Resource for admin operations. diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java index 8518e12..2f7ba20 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java @@ -23,11 +23,12 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.ClassificationAssociateRequest; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v1.EntityStream; @@ -37,7 +38,6 @@ import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedStruct; -import org.apache.atlas.web.adapters.AtlasInstanceRestAdapters; import org.apache.atlas.web.util.Servlets; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -46,7 +46,15 @@ import org.apache.commons.lang3.StringUtils; import javax.inject.Inject; import javax.inject.Singleton; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import java.util.ArrayList; @@ -54,7 +62,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.atlas.web.adapters.AtlasInstanceRestAdapters.toAtlasBaseException; +import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException; /** * REST for a single entity @@ -66,14 +74,15 @@ public class EntityREST { public static final String PREFIX_ATTR = "attr:"; private final AtlasTypeRegistry typeRegistry; - private final AtlasInstanceRestAdapters restAdapters; + private final AtlasInstanceConverter restAdapters; private final MetadataService metadataService; private final AtlasEntityStore entitiesStore; @Inject - public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceRestAdapters restAdapters, MetadataService metadataService, AtlasEntityStore entitiesStore) { + public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter, + MetadataService metadataService, AtlasEntityStore entitiesStore) { this.typeRegistry = typeRegistry; - this.restAdapters = restAdapters; + this.restAdapters = instanceConverter; this.metadataService = metadataService; this.entitiesStore = entitiesStore; } @@ -437,4 +446,4 @@ public class EntityREST { } } } -} \ No newline at end of file +} diff --git a/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java b/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java deleted file mode 100644 index c5616df..0000000 --- a/webapp/src/test/java/org/apache/atlas/LocalAtlasClientTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas; - -import com.google.inject.Inject; -import com.sun.jersey.api.client.ClientResponse; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.web.resources.EntityResource; -import org.apache.atlas.web.service.ServiceState; -import org.apache.commons.lang.RandomStringUtils; -import org.codehaus.jettison.json.JSONObject; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.apache.atlas.AtlasClient.ENTITIES; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyListOf; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -@Guice(modules= RepositoryMetadataModule.class) -public class LocalAtlasClientTest { - @Mock - private EntityResource mockEntityResource; - - @Inject - private EntityResource entityResource; - - @Mock - private ServiceState serviceState; - - @BeforeMethod - public void setup() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testCreateEntity() throws Exception { - Response response = mock(Response.class); - when(mockEntityResource.submit(any(HttpServletRequest.class))).thenReturn(response); - final String guid = random(); - when(response.getEntity()).thenReturn(new JSONObject() {{ - put(ENTITIES, new JSONObject( - new AtlasClient.EntityResult(Arrays.asList(guid), null, null).toString()).get(ENTITIES)); - }}); - - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - List<String> results = atlasClient.createEntity(new Referenceable(random())); - assertEquals(results.size(), 1); - assertEquals(results.get(0), guid); - } - - @Test - public void testException() throws Exception { - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - - Response response = mock(Response.class); - when(mockEntityResource.submit(any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response)); - when(response.getEntity()).thenReturn(new JSONObject() {{ - put("stackTrace", "stackTrace"); - }}); - when(response.getStatus()).thenReturn(Response.Status.BAD_REQUEST.getStatusCode()); - try { - atlasClient.createEntity(new Referenceable(random())); - fail("Expected AtlasServiceException"); - } catch(AtlasServiceException e) { - assertEquals(e.getStatus(), ClientResponse.Status.BAD_REQUEST); - } - - when(mockEntityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(), - any(HttpServletRequest.class))).thenThrow(new WebApplicationException(response)); - when(response.getStatus()).thenReturn(Response.Status.NOT_FOUND.getStatusCode()); - try { - atlasClient.updateEntity(random(), random(), random(), new Referenceable(random())); - fail("Expected AtlasServiceException"); - } catch(AtlasServiceException e) { - assertEquals(e.getStatus(), ClientResponse.Status.NOT_FOUND); - } - - } - - @Test - public void testIsServerReady() throws Exception { - when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - assertTrue(atlasClient.isServerReady()); - - when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.BECOMING_ACTIVE); - assertFalse(atlasClient.isServerReady()); - } - - @Test - public void testUpdateEntity() throws Exception { - final String guid = random(); - Response response = mock(Response.class); - when(mockEntityResource.updateByUniqueAttribute(anyString(), anyString(), anyString(), - any(HttpServletRequest.class))).thenReturn(response); - when(response.getEntity()).thenReturn(new JSONObject() {{ - put(ENTITIES, new JSONObject( - new AtlasClient.EntityResult(null, Arrays.asList(guid), null).toString()).get(ENTITIES)); - }}); - - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - AtlasClient.EntityResult - entityResult = atlasClient.updateEntity(random(), random(), random(), new Referenceable(random())); - assertEquals(entityResult.getUpdateEntities(), Arrays.asList(guid)); - } - - @Test - public void testDeleteEntity() throws Exception { - final String guid = random(); - Response response = mock(Response.class); - when(response.getEntity()).thenReturn(new JSONObject() {{ - put(ENTITIES, new JSONObject( - new AtlasClient.EntityResult(null, null, Arrays.asList(guid)).toString()).get(ENTITIES)); - }}); - - when(mockEntityResource.deleteEntities(anyListOf(String.class), anyString(), anyString(), anyString())).thenReturn(response); - LocalAtlasClient atlasClient = new LocalAtlasClient(serviceState, mockEntityResource); - AtlasClient.EntityResult entityResult = atlasClient.deleteEntity(random(), random(), random()); - assertEquals(entityResult.getDeletedEntities(), Arrays.asList(guid)); - } - - private String random() { - return RandomStringUtils.randomAlphanumeric(10); - } - - @Test - @Inject - public void testGetLocationURI() { - final String guid = "123"; - URI uri = entityResource.getLocationURI(new ArrayList<String>() {{ add(guid); }}); - uri.getRawPath().equals(AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS + "/" + AtlasClient.API.GET_ENTITY.getPath() + "/" + guid); - } -} diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 873e562..13747b2 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -22,19 +22,30 @@ import com.google.inject.Inject; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.LocalAtlasClient; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.kafka.KafkaNotification; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v1.EntityStream; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.web.service.ServiceState; import org.apache.commons.lang.RandomStringUtils; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.testng.Assert; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Guice; import org.testng.annotations.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.*; @Guice(modules = NotificationModule.class) public class NotificationHookConsumerKafkaTest { @@ -45,10 +56,28 @@ public class NotificationHookConsumerKafkaTest { @Inject private NotificationInterface notificationInterface; + + @Mock + private AtlasEntityStore atlasEntityStore; + + @Mock + private ServiceState serviceState; + + @Mock + private AtlasInstanceConverter instanceConverter; + + @Mock + private AtlasTypeRegistry typeRegistry; + private KafkaNotification kafkaNotification; @BeforeTest - public void setup() throws AtlasException, InterruptedException { + public void setup() throws AtlasException, InterruptedException, AtlasBaseException { + MockitoAnnotations.initMocks(this); + AtlasType mockType = mock(AtlasType.class); + when(typeRegistry.getType(anyString())).thenReturn(mockType); + AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); + when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity); kafkaNotification = startKafkaServer(); } @@ -58,25 +87,25 @@ public class NotificationHookConsumerKafkaTest { } @Test - public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException { + public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException { try { produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); NotificationConsumer<HookNotification.HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false); - LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(kafkaNotification, localAtlasClient); + new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user1"); - + verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); + // produce another message, and make sure it moves ahead. If commit succeeded, this would work. produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user2"); + verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); + reset(atlasEntityStore); } finally { kafkaNotification.close(); @@ -90,20 +119,19 @@ public class NotificationHookConsumerKafkaTest { NotificationConsumer<HookNotification.HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true); - LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(kafkaNotification, localAtlasClient); + new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user3"); + verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity())); consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user3"); + verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); } finally { kafkaNotification.close(); diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index f06f791..b86c693 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -17,13 +17,20 @@ */ package org.apache.atlas.notification; -import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.LocalAtlasClient; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v1.EntityStream; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.web.service.ServiceState; import org.apache.commons.configuration.Configuration; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -31,16 +38,11 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; @@ -50,26 +52,41 @@ public class NotificationHookConsumerTest { private NotificationInterface notificationInterface; @Mock - private LocalAtlasClient atlasClient; - - @Mock private Configuration configuration; @Mock private ExecutorService executorService; + @Mock + private AtlasEntityStore atlasEntityStore; + + @Mock + private ServiceState serviceState; + + @Mock + private AtlasInstanceConverter instanceConverter; + + @Mock + private AtlasTypeRegistry typeRegistry; + @BeforeMethod - public void setup() { + public void setup() throws AtlasBaseException { MockitoAnnotations.initMocks(this); + AtlasType mockType = mock(AtlasType.class); + when(typeRegistry.getType(anyString())).thenReturn(mockType); + AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); + when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity); + EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class); + when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse); } @Test public void testConsumerCanProceedIfServerIsReady() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); - when(atlasClient.isServerReady()).thenReturn(true); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); assertTrue(hookConsumer.serverAvailable(timer)); @@ -78,11 +95,16 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); - when(atlasClient.isServerReady()).thenReturn(false, false, false, true); + + when(serviceState.getState()) + .thenReturn(ServiceState.ServiceStateValue.PASSIVE) + .thenReturn(ServiceState.ServiceStateValue.PASSIVE) + .thenReturn(ServiceState.ServiceStateValue.PASSIVE) + .thenReturn(ServiceState.ServiceStateValue.ACTIVE); assertTrue(hookConsumer.serverAvailable(timer)); @@ -92,13 +114,15 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasClient); + new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class); when(message.getUser()).thenReturn("user"); when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE); + Referenceable mock = mock(Referenceable.class); + when(message.getEntities()).thenReturn(Arrays.asList(mock)); hookConsumer.handleMessage(message); @@ -106,15 +130,17 @@ public class NotificationHookConsumerTest { } @Test - public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException { + public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException { NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasClient); + new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); - HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", new ArrayList<Referenceable>()); - when(atlasClient.createEntity(any(List.class))). - thenThrow(new RuntimeException("Simulating exception in processing message")); + HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", + new ArrayList<Referenceable>() { + { add(mock(Referenceable.class)); } + }); + when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message")); hookConsumer.handleMessage(message); verifyZeroInteractions(consumer); @@ -122,24 +148,12 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS); - when(atlasClient.isServerReady()).thenReturn(false); - - assertFalse(hookConsumer.serverAvailable(timer)); - } - - @Test - public void testConsumerProceedsWithFalseOnAtlasServiceException() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); - NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); - when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION, - new Exception())); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); assertFalse(hookConsumer.serverAvailable(timer)); } @@ -152,7 +166,7 @@ public class NotificationHookConsumerTest { consumers.add(mock(NotificationConsumer.class)); when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); @@ -167,7 +181,7 @@ public class NotificationHookConsumerTest { consumers.add(mock(NotificationConsumer.class)); when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); verifyZeroInteractions(notificationInterface); } @@ -181,7 +195,7 @@ public class NotificationHookConsumerTest { consumers.add(mock(NotificationConsumer.class)); when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsActive(); verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); @@ -196,7 +210,7 @@ public class NotificationHookConsumerTest { consumers.add(mock(NotificationConsumer.class)); when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); verify(notificationInterface).close();