Commit dce31ab8 by Shwetha GS

ATLAS-386 Handle hive rename Table (shwethags)

parent 90692af5
...@@ -290,6 +290,10 @@ ...@@ -290,6 +290,10 @@
<name>atlas.log.dir</name> <name>atlas.log.dir</name>
<value>${project.build.directory}/logs</value> <value>${project.build.directory}/logs</value>
</systemProperty> </systemProperty>
<systemProperty>
<name>atlas.data</name>
<value>${project.build.directory}/data</value>
</systemProperty>
</systemProperties> </systemProperties>
<stopKey>atlas-stop</stopKey> <stopKey>atlas-stop</stopKey>
<stopPort>31001</stopPort> <stopPort>31001</stopPort>
......
...@@ -70,6 +70,10 @@ public class HiveMetaStoreBridge { ...@@ -70,6 +70,10 @@ public class HiveMetaStoreBridge {
this(hiveConf, atlasConf, null, null); this(hiveConf, atlasConf, null, null);
} }
public String getClusterName() {
return clusterName;
}
/** /**
* Construct a HiveMetaStoreBridge. * Construct a HiveMetaStoreBridge.
* @param hiveConf hive conf * @param hiveConf hive conf
......
...@@ -149,6 +149,17 @@ public class HiveHookIT { ...@@ -149,6 +149,17 @@ public class HiveHookIT {
assertDatabaseIsRegistered(DEFAULT_DB); assertDatabaseIsRegistered(DEFAULT_DB);
} }
@Test
public void testRenameTable() throws Exception {
String tableName = createTable();
String newTableName = tableName();
runCommand(String.format("alter table %s rename to %s", tableName, newTableName));
assertTableIsRegistered(DEFAULT_DB, newTableName);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
private String assertColumnIsRegistered(String colName) throws Exception { private String assertColumnIsRegistered(String colName) throws Exception {
LOG.debug("Searching for column {}", colName); LOG.debug("Searching for column {}", colName);
String query = String query =
...@@ -327,8 +338,8 @@ public class HiveHookIT { ...@@ -327,8 +338,8 @@ public class HiveHookIT {
LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value); LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value);
String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', " String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', "
+ "db where name = '%s' and clusterName = '%s' select p", typeName, value, + "db where name = '%s' and clusterName = '%s' select p", typeName, value,
tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
return assertEntityIsRegistered(dslQuery, "p"); return assertEntityIsRegistered(dslQuery, "p");
} }
......
...@@ -44,6 +44,7 @@ import javax.ws.rs.core.Response; ...@@ -44,6 +44,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriBuilder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED; import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
...@@ -291,12 +292,16 @@ public class AtlasClient { ...@@ -291,12 +292,16 @@ public class AtlasClient {
} }
public JSONArray createEntity(Referenceable... entities) throws AtlasServiceException { public JSONArray createEntity(Referenceable... entities) throws AtlasServiceException {
return createEntity(Arrays.asList(entities));
}
public JSONArray createEntity(Collection<Referenceable> entities) throws AtlasServiceException {
JSONArray entityArray = getEntitiesArray(entities); JSONArray entityArray = getEntitiesArray(entities);
return createEntity(entityArray); return createEntity(entityArray);
} }
private JSONArray getEntitiesArray(Referenceable[] entities) { private JSONArray getEntitiesArray(Collection<Referenceable> entities) {
JSONArray entityArray = new JSONArray(entities.length); JSONArray entityArray = new JSONArray(entities.size());
for (Referenceable entity : entities) { for (Referenceable entity : entities) {
entityArray.put(InstanceSerialization.toJson(entity, true)); entityArray.put(InstanceSerialization.toJson(entity, true));
} }
...@@ -311,6 +316,10 @@ public class AtlasClient { ...@@ -311,6 +316,10 @@ public class AtlasClient {
* @throws AtlasServiceException * @throws AtlasServiceException
*/ */
public JSONArray updateEntities(Referenceable... entities) throws AtlasServiceException { public JSONArray updateEntities(Referenceable... entities) throws AtlasServiceException {
return updateEntities(Arrays.asList(entities));
}
public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
JSONArray entitiesArray = getEntitiesArray(entities); JSONArray entitiesArray = getEntitiesArray(entities);
JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString()); JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString());
try { try {
......
...@@ -29,7 +29,7 @@ The entities are created and de-duped using unique qualified name. They provide ...@@ -29,7 +29,7 @@ The entities are created and de-duped using unique qualified name. They provide
---++ Importing Hive Metadata ---++ Importing Hive Metadata
org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this. org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this.
Set the following configuration in <atlas-conf>/client.properties and set environment variable HIVE_CONFIG to the hive conf directory: Set the following configuration in <atlas-conf>/client.properties and set environment variable $HIVE_CONF_DIR to the hive conf directory:
<verbatim> <verbatim>
<property> <property>
<name>atlas.cluster.name</name> <name>atlas.cluster.name</name>
......
...@@ -174,7 +174,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -174,7 +174,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
} }
@Override @Override
public void send(NotificationType type, String... messages) throws NotificationException { public void _send(NotificationType type, String... messages) throws NotificationException {
if (producer == null) { if (producer == null) {
createProducer(); createProducer();
} }
......
...@@ -20,6 +20,9 @@ package org.apache.atlas.notification; ...@@ -20,6 +20,9 @@ package org.apache.atlas.notification;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import java.util.Arrays;
import java.util.List;
/** /**
* Abstract notification interface implementation. * Abstract notification interface implementation.
*/ */
...@@ -46,4 +49,20 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -46,4 +49,20 @@ public abstract class AbstractNotification implements NotificationInterface {
protected final boolean isEmbedded() { protected final boolean isEmbedded() {
return embedded; return embedded;
} }
@Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
String[] strMessages = new String[messages.size()];
for (int index = 0; index < messages.size(); index++) {
strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index));
}
_send(type, strMessages);
}
@Override
public <T> void send(NotificationType type, T... messages) throws NotificationException {
send(type, Arrays.asList(messages));
}
protected abstract void _send(NotificationType type, String[] messages) throws NotificationException;
} }
...@@ -26,11 +26,16 @@ import com.google.gson.JsonDeserializationContext; ...@@ -26,11 +26,16 @@ import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer; import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import com.google.gson.JsonParseException; import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.gson.reflect.TypeToken; import com.google.gson.reflect.TypeToken;
import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.notification.entity.EntityNotificationImpl; import org.apache.atlas.notification.entity.EntityNotificationImpl;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
...@@ -45,13 +50,15 @@ import java.util.Map; ...@@ -45,13 +50,15 @@ import java.util.Map;
*/ */
public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> { public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> {
private static final Gson GSON = new GsonBuilder(). public static final Gson GSON = new GsonBuilder().
registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()). registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()).
registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()). registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()).
registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()). registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()).
registerTypeAdapter(IStruct.class, new StructDeserializer()). registerTypeAdapter(IStruct.class, new StructDeserializer()).
registerTypeAdapter(IReferenceableInstance.class, new ReferenceableDeserializer()). registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()).
registerTypeAdapter(JSONArray.class, new JSONArrayDeserializer()). registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()).
registerTypeAdapter(JSONArray.class, new JSONArraySerializerDeserializer()).
registerTypeAdapter(HookNotification.HookNotificationMessage.class, new HookNotification()).
create(); create();
private final Class<T> type; private final Class<T> type;
...@@ -136,30 +143,44 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon ...@@ -136,30 +143,44 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
// ----- inner class : StructDeserializer ------------------------------- // ----- inner class : StructDeserializer -------------------------------
public final static class StructDeserializer implements JsonDeserializer<IStruct> { public final static class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> {
@Override @Override
public IStruct deserialize(final JsonElement json, final Type type, public IStruct deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException { final JsonDeserializationContext context) throws JsonParseException {
return context.deserialize(json, Struct.class); return context.deserialize(json, Struct.class);
} }
@Override
public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) {
String instanceJson = InstanceSerialization.toJson(src, true);
return new JsonParser().parse(instanceJson).getAsJsonObject();
}
} }
// ----- inner class : ReferenceableDeserializer ------------------------ // ----- inner class : ReferenceableSerializerDeserializer ------------------------
public final static class ReferenceableDeserializer implements JsonDeserializer<IStruct> { public final static class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>,
JsonSerializer<IReferenceableInstance> {
@Override @Override
public IReferenceableInstance deserialize(final JsonElement json, final Type type, public IReferenceableInstance deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException { final JsonDeserializationContext context) throws JsonParseException {
return InstanceSerialization.fromJsonReferenceable(json.toString(), true); return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
} }
@Override
public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
String instanceJson = InstanceSerialization.toJson(src, true);
return new JsonParser().parse(instanceJson).getAsJsonObject();
}
} }
// ----- inner class : JSONArrayDeserializer ---------------------------- // ----- inner class : JSONArraySerializerDeserializer ----------------------------
public final static class JSONArrayDeserializer implements JsonDeserializer<JSONArray> { public final static class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>,
JsonSerializer<JSONArray> {
@Override @Override
public JSONArray deserialize(final JsonElement json, final Type type, public JSONArray deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException { final JsonDeserializationContext context) throws JsonParseException {
...@@ -170,5 +191,10 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon ...@@ -170,5 +191,10 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
throw new JsonParseException(e.getMessage(), e); throw new JsonParseException(e.getMessage(), e);
} }
} }
@Override
public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
return new JsonParser().parse(src.toString()).getAsJsonArray();
}
} }
} }
...@@ -23,9 +23,9 @@ import org.apache.atlas.ApplicationProperties; ...@@ -23,9 +23,9 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -57,11 +57,11 @@ public class NotificationHookConsumer implements Service { ...@@ -57,11 +57,11 @@ public class NotificationHookConsumer implements Service {
String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
atlasClient = new AtlasClient(atlasEndpoint); atlasClient = new AtlasClient(atlasEndpoint);
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
List<NotificationConsumer<JSONArray>> consumers = List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers =
notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
executors = Executors.newFixedThreadPool(consumers.size()); executors = Executors.newFixedThreadPool(consumers.size());
for (final NotificationConsumer<JSONArray> consumer : consumers) { for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) {
executors.submit(new HookConsumer(consumer)); executors.submit(new HookConsumer(consumer));
} }
} }
...@@ -86,14 +86,14 @@ public class NotificationHookConsumer implements Service { ...@@ -86,14 +86,14 @@ public class NotificationHookConsumer implements Service {
} }
class HookConsumer implements Runnable { class HookConsumer implements Runnable {
private final NotificationConsumer<JSONArray> consumer; private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
private final AtlasClient client; private final AtlasClient client;
public HookConsumer(NotificationConsumer<JSONArray> consumer) { public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
this(atlasClient, consumer); this(atlasClient, consumer);
} }
public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) { public HookConsumer(AtlasClient client, NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
this.client = client; this.client = client;
this.consumer = consumer; this.consumer = consumer;
} }
...@@ -106,14 +106,33 @@ public class NotificationHookConsumer implements Service { ...@@ -106,14 +106,33 @@ public class NotificationHookConsumer implements Service {
} }
while(consumer.hasNext()) { while(consumer.hasNext()) {
JSONArray entityJson = consumer.next(); HookNotification.HookNotificationMessage message = consumer.next();
LOG.info("Processing message {}", entityJson);
try { try {
JSONArray guids = atlasClient.createEntity(entityJson); switch (message.getType()) {
LOG.info("Create entities with guid {}", guids); case ENTITY_CREATE:
HookNotification.EntityCreateRequest createRequest =
(HookNotification.EntityCreateRequest) message;
atlasClient.createEntity(createRequest.getEntities());
break;
case ENTITY_PARTIAL_UPDATE:
HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
(HookNotification.EntityPartialUpdateRequest) message;
atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(),
partialUpdateRequest.getEntity());
break;
case ENTITY_FULL_UPDATE:
HookNotification.EntityUpdateRequest updateRequest =
(HookNotification.EntityUpdateRequest) message;
atlasClient.updateEntities(updateRequest.getEntities());
break;
}
} catch (Exception e) { } catch (Exception e) {
//todo handle failures //todo handle failures
LOG.warn("Error handling message {}", entityJson, e); LOG.debug("Error handling message {}", message, e);
} }
} }
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.atlas.notification; package org.apache.atlas.notification;
import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.notification.entity.EntityNotification;
import org.codehaus.jettison.json.JSONArray; import org.apache.atlas.notification.hook.HookNotification;
import java.util.List; import java.util.List;
...@@ -28,7 +28,7 @@ public interface NotificationInterface { ...@@ -28,7 +28,7 @@ public interface NotificationInterface {
String PROPERTY_PREFIX = "atlas.notification"; String PROPERTY_PREFIX = "atlas.notification";
enum NotificationType { enum NotificationType {
HOOK(JSONArray.class), ENTITIES(EntityNotification.class); HOOK(HookNotification.HookNotificationMessage.class), ENTITIES(EntityNotification.class);
private final Class classType; private final Class classType;
...@@ -52,7 +52,9 @@ public interface NotificationInterface { ...@@ -52,7 +52,9 @@ public interface NotificationInterface {
*/ */
<T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers); <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers);
void send(NotificationType type, String... messages) throws NotificationException; <T> void send(NotificationType type, T... messages) throws NotificationException;
<T> void send(NotificationType type, List<T> messages) throws NotificationException;
void close(); void close();
} }
...@@ -17,12 +17,6 @@ ...@@ -17,12 +17,6 @@
package org.apache.atlas.notification.entity; package org.apache.atlas.notification.entity;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationInterface;
...@@ -30,10 +24,8 @@ import org.apache.atlas.typesystem.IReferenceableInstance; ...@@ -30,10 +24,8 @@ import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeSystem;
import java.lang.reflect.Type;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
...@@ -44,9 +36,6 @@ import java.util.List; ...@@ -44,9 +36,6 @@ import java.util.List;
*/ */
public class NotificationEntityChangeListener implements EntityChangeListener { public class NotificationEntityChangeListener implements EntityChangeListener {
private static final Gson GSON = new GsonBuilder().
registerTypeAdapter(Referenceable.class, new ReferencableSerializer()).create();
private final NotificationInterface notificationInterface; private final NotificationInterface notificationInterface;
private final TypeSystem typeSystem; private final TypeSystem typeSystem;
...@@ -93,7 +82,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener { ...@@ -93,7 +82,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
// send notification of entity change // send notification of entity change
private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions, private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions,
EntityNotification.OperationType operationType) throws AtlasException { EntityNotification.OperationType operationType) throws AtlasException {
List<String> messages = new LinkedList<>(); List<EntityNotification> messages = new LinkedList<>();
for (IReferenceableInstance entityDefinition : entityDefinitions) { for (IReferenceableInstance entityDefinition : entityDefinitions) {
Referenceable entity = new Referenceable(entityDefinition); Referenceable entity = new Referenceable(entityDefinition);
...@@ -101,24 +90,9 @@ public class NotificationEntityChangeListener implements EntityChangeListener { ...@@ -101,24 +90,9 @@ public class NotificationEntityChangeListener implements EntityChangeListener {
EntityNotificationImpl notification = EntityNotificationImpl notification =
new EntityNotificationImpl(entity, operationType, typeSystem); new EntityNotificationImpl(entity, operationType, typeSystem);
messages.add(GSON.toJson(notification)); messages.add(notification);
} }
notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages);
messages.toArray(new String[messages.size()]));
}
// ----- inner class : ReferencableSerializer ---------------------------
private static class ReferencableSerializer implements JsonSerializer<Referenceable> {
public static final JsonParser JSON_PARSER = new JsonParser();
@Override
public JsonElement serialize(Referenceable referenceable, Type type,
JsonSerializationContext jsonSerializationContext) {
return JSON_PARSER.parse(InstanceSerialization.toJson(referenceable, true)).getAsJsonObject();
}
} }
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.hook;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> {
@Override
public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
JsonDeserializationContext context) throws JsonParseException {
if (json.isJsonArray()) {
JSONArray jsonArray = context.deserialize(json, JSONArray.class);
return new EntityCreateRequest(jsonArray);
} else {
HookNotificationType type =
context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class);
switch (type) {
case ENTITY_CREATE:
return context.deserialize(json, EntityCreateRequest.class);
case ENTITY_FULL_UPDATE:
return context.deserialize(json, EntityUpdateRequest.class);
case ENTITY_PARTIAL_UPDATE:
return context.deserialize(json, EntityPartialUpdateRequest.class);
case TYPE_CREATE:
case TYPE_UPDATE:
return context.deserialize(json, TypeRequest.class);
}
throw new IllegalStateException("Unhandled type " + type);
}
}
public enum HookNotificationType {
TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE
}
public static class HookNotificationMessage {
protected HookNotificationType type;
private HookNotificationMessage() { }
public HookNotificationMessage(HookNotificationType type) {
this.type = type;
}
public HookNotificationType getType() {
return type;
}
}
public static class TypeRequest extends HookNotificationMessage {
private TypesDef typesDef;
private TypeRequest() { }
public TypeRequest(HookNotificationType type, TypesDef typesDef) {
super(type);
this.typesDef = typesDef;
}
public TypesDef getTypesDef() {
return typesDef;
}
}
public static class EntityCreateRequest extends HookNotificationMessage {
private List<Referenceable> entities;
private EntityCreateRequest() { }
public EntityCreateRequest(Referenceable... entities) {
super(HookNotificationType.ENTITY_CREATE);
this.entities = Arrays.asList(entities);
}
protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) {
super(type);
this.entities = entities;
}
public EntityCreateRequest(JSONArray jsonArray) {
super(HookNotificationType.ENTITY_CREATE);
entities = new ArrayList<>();
for (int index = 0; index < jsonArray.length(); index++) {
try {
entities.add(InstanceSerialization.fromJsonReferenceable(jsonArray.getString(index), true));
} catch (JSONException e) {
throw new JsonParseException(e);
}
}
}
public List<Referenceable> getEntities() throws JSONException {
return entities;
}
}
public static class EntityUpdateRequest extends EntityCreateRequest {
public EntityUpdateRequest(Referenceable... entities) {
this(Arrays.asList(entities));
}
public EntityUpdateRequest(List<Referenceable> entities) {
super(HookNotificationType.ENTITY_FULL_UPDATE, entities);
}
}
public static class EntityPartialUpdateRequest extends HookNotificationMessage {
private String typeName;
private String attribute;
private Referenceable entity;
private String attributeValue;
private EntityPartialUpdateRequest() { }
public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue,
Referenceable entity) {
super(HookNotificationType.ENTITY_PARTIAL_UPDATE);
this.typeName = typeName;
this.attribute = attribute;
this.attributeValue = attributeValue;
this.entity = entity;
}
public String getTypeName() {
return typeName;
}
public String getAttribute() {
return attribute;
}
public Referenceable getEntity() {
return entity;
}
public String getAttributeValue() {
return attributeValue;
}
}
}
...@@ -26,9 +26,7 @@ import org.apache.atlas.notification.NotificationConsumer; ...@@ -26,9 +26,7 @@ import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.notification.NotificationModule;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.codehaus.jettison.json.JSONArray;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
...@@ -61,20 +59,6 @@ public class KafkaNotificationTest { ...@@ -61,20 +59,6 @@ public class KafkaNotificationTest {
} }
@Test @Test
public void testSendReceiveMessage() throws Exception {
String msg1 = "[{\"message\": " + 123 + "}]";
String msg2 = "[{\"message\": " + 456 + "}]";
kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2);
List<NotificationConsumer<JSONArray>> consumers =
kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1);
NotificationConsumer<JSONArray> consumer = consumers.get(0);
assertTrue(consumer.hasNext());
assertEquals(new JSONArray(msg1), consumer.next());
assertTrue(consumer.hasNext());
assertEquals(new JSONArray(msg2), consumer.next());
}
@Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testCreateConsumers() throws Exception { public void testCreateConsumers() throws Exception {
Configuration configuration = mock(Configuration.class); Configuration configuration = mock(Configuration.class);
...@@ -119,10 +103,6 @@ public class KafkaNotificationTest { ...@@ -119,10 +103,6 @@ public class KafkaNotificationTest {
assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
} }
private String random() {
return RandomStringUtils.randomAlphanumeric(5);
}
@AfterClass @AfterClass
public void teardown() throws Exception { public void teardown() throws Exception {
kafka.stop(); kafka.stop();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.hook;
import org.apache.atlas.notification.AbstractNotificationConsumer;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.codehaus.jettison.json.JSONArray;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class HookNotificationTest {
@Test
public void testMessageBackwardCompatibility() throws Exception {
JSONArray jsonArray = new JSONArray();
Referenceable entity = new Referenceable("sometype");
entity.set("name", "somename");
String entityJson = InstanceSerialization.toJson(entity, true);
jsonArray.put(entityJson);
HookNotification.HookNotificationMessage notification = AbstractNotificationConsumer.GSON.fromJson(
jsonArray.toString(), HookNotification.HookNotificationMessage.class);
assertNotNull(notification);
assertEquals(notification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) notification;
assertEquals(createRequest.getEntities().size(), 1);
assertEquals(createRequest.getEntities().get(0).getTypeName(), entity.getTypeName());
}
@Test
public void testNewMessageSerDe() throws Exception {
Referenceable entity1 = new Referenceable("sometype");
entity1.set("attr", "value");
entity1.set("complex", new Referenceable("othertype"));
Referenceable entity2 = new Referenceable("newtype");
HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(entity1, entity2);
String notificationJson = AbstractNotificationConsumer.GSON.toJson(request);
HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson(
notificationJson, HookNotification.HookNotificationMessage.class);
assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification;
assertEquals(createRequest.getEntities().size(), 2);
Referenceable actualEntity1 = createRequest.getEntities().get(0);
assertEquals(actualEntity1.getTypeName(), "sometype");
assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype");
assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype");
}
}
...@@ -354,7 +354,8 @@ ...@@ -354,7 +354,8 @@
<!-- skips checkstyle and find bugs --> <!-- skips checkstyle and find bugs -->
<skipCheck>false</skipCheck> <skipCheck>false</skipCheck>
<skipTests>false</skipTests> <skipUTs>false</skipUTs>
<skipITs>false</skipITs>
<skipDocs>true</skipDocs> <skipDocs>true</skipDocs>
<skipSite>true</skipSite> <skipSite>true</skipSite>
<projectBaseDir>${project.basedir}</projectBaseDir> <projectBaseDir>${project.basedir}</projectBaseDir>
...@@ -1087,7 +1088,7 @@ ...@@ -1087,7 +1088,7 @@
<dependency> <dependency>
<groupId>com.google.code.gson</groupId> <groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId> <artifactId>gson</artifactId>
<version>2.3.1</version> <version>2.5</version>
</dependency> </dependency>
<dependency> <dependency>
...@@ -1394,6 +1395,7 @@ ...@@ -1394,6 +1395,7 @@
<configuration> <configuration>
<systemProperties> <systemProperties>
<user.dir>${project.basedir}</user.dir> <user.dir>${project.basedir}</user.dir>
<atlas.data>${project.build.directory}/data</atlas.data>
</systemProperties> </systemProperties>
<!--<skipTests>true</skipTests>--> <!--<skipTests>true</skipTests>-->
<forkMode>always</forkMode> <forkMode>always</forkMode>
...@@ -1403,6 +1405,7 @@ ...@@ -1403,6 +1405,7 @@
-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml -Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml
-Djava.net.preferIPv4Stack=true -Djava.net.preferIPv4Stack=true
</argLine> </argLine>
<skip>${skipUTs}</skip>
<excludes> <excludes>
<exclude>**/*Base*</exclude> <exclude>**/*Base*</exclude>
</excludes> </excludes>
...@@ -1423,12 +1426,14 @@ ...@@ -1423,12 +1426,14 @@
<configuration> <configuration>
<systemPropertyVariables> <systemPropertyVariables>
<projectBaseDir>${projectBaseDir}</projectBaseDir> <projectBaseDir>${projectBaseDir}</projectBaseDir>
<atlas.data>${project.build.directory}/data</atlas.data>
</systemPropertyVariables> </systemPropertyVariables>
<redirectTestOutputToFile>true</redirectTestOutputToFile> <redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine>-Djava.awt.headless=true -Dproject.version=${project.version} <argLine>-Djava.awt.headless=true -Dproject.version=${project.version}
-Dhadoop.tmp.dir="${project.build.directory}/tmp-hadoop-${user.name}" -Dhadoop.tmp.dir="${project.build.directory}/tmp-hadoop-${user.name}"
-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml -Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml
</argLine> </argLine>
<skip>${skipITs}</skip>
<parallel>none</parallel> <parallel>none</parallel>
<reuseForks>false</reuseForks> <reuseForks>false</reuseForks>
<forkCount>1</forkCount> <forkCount>1</forkCount>
......
...@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ...@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags) ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-386 Handle hive rename Table (shwethags)
ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemanth via sumasai) ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemanth via sumasai)
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai) ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai) ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai)
......
...@@ -22,14 +22,9 @@ import com.google.common.collect.ImmutableMap; ...@@ -22,14 +22,9 @@ import com.google.common.collect.ImmutableMap;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.thinkaurelius.titan.core.TitanFactory; import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.TitanTransaction;
import com.thinkaurelius.titan.core.schema.TitanManagement; import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.thinkaurelius.titan.diskstorage.Backend;
import com.thinkaurelius.titan.diskstorage.StandardIndexProvider; import com.thinkaurelius.titan.diskstorage.StandardIndexProvider;
import com.thinkaurelius.titan.diskstorage.indexing.IndexInformation;
import com.thinkaurelius.titan.diskstorage.solr.Solr5Index; import com.thinkaurelius.titan.diskstorage.solr.Solr5Index;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
...@@ -41,7 +36,6 @@ import java.lang.reflect.Field; ...@@ -41,7 +36,6 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* Default implementation for Graph Provider that doles out Titan Graph. * Default implementation for Graph Provider that doles out Titan Graph.
......
...@@ -304,7 +304,14 @@ public class DefaultMetadataService implements MetadataService { ...@@ -304,7 +304,14 @@ public class DefaultMetadataService implements MetadataService {
ParamChecker.notEmpty(entityTypeName, "Entity type cannot be null"); ParamChecker.notEmpty(entityTypeName, "Entity type cannot be null");
ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName); ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName);
//Both assigned id and values are required for full update
//classtype.convert() will remove values if id is assigned. So, set temp id, convert and
// then replace with original id
Id origId = entityInstance.getId();
entityInstance.replaceWithNewId(new Id(entityInstance.getTypeName()));
ITypedReferenceableInstance typedInstrance = entityType.convert(entityInstance, Multiplicity.REQUIRED); ITypedReferenceableInstance typedInstrance = entityType.convert(entityInstance, Multiplicity.REQUIRED);
((ReferenceableInstance)typedInstrance).replaceWithNewId(origId);
instances[index] = typedInstrance; instances[index] = typedInstrance;
} }
return instances; return instances;
......
...@@ -251,12 +251,12 @@ public final class TestUtils { ...@@ -251,12 +251,12 @@ public final class TestUtils {
new AttributeDefinition("columnsMap", new AttributeDefinition("columnsMap",
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(),
"column_type"), "column_type"),
Multiplicity.COLLECTION, true, null), Multiplicity.OPTIONAL, true, null),
//map of structs //map of structs
new AttributeDefinition("partitionsMap", new AttributeDefinition("partitionsMap",
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(),
"partition_type"), "partition_type"),
Multiplicity.COLLECTION, true, null), Multiplicity.OPTIONAL, true, null),
// struct reference // struct reference
new AttributeDefinition("serde1", "serdeType", Multiplicity.OPTIONAL, false, null), new AttributeDefinition("serde1", "serdeType", Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("serde2", "serdeType", Multiplicity.OPTIONAL, false, null), new AttributeDefinition("serde2", "serdeType", Multiplicity.OPTIONAL, false, null),
......
...@@ -22,9 +22,6 @@ import com.google.common.collect.ImmutableList; ...@@ -22,9 +22,6 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.util.TitanCleanup; import com.thinkaurelius.titan.core.util.TitanCleanup;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.TestUtils; import org.apache.atlas.TestUtils;
import org.apache.atlas.repository.graph.GraphProvider; import org.apache.atlas.repository.graph.GraphProvider;
...@@ -32,12 +29,15 @@ import org.apache.atlas.services.MetadataService; ...@@ -32,12 +29,15 @@ import org.apache.atlas.services.MetadataService;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef; import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.EnumValue; import org.apache.atlas.typesystem.types.EnumValue;
import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.typesystem.types.ValueConversionException; import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.testng.Assert; import org.testng.Assert;
...@@ -47,11 +47,15 @@ import org.testng.annotations.Guice; ...@@ -47,11 +47,15 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@Guice(modules = RepositoryMetadataModule.class) @Guice(modules = RepositoryMetadataModule.class)
public class DefaultMetadataServiceTest { public class DefaultMetadataServiceTest {
@Inject @Inject
...@@ -296,8 +300,8 @@ public class DefaultMetadataServiceTest { ...@@ -296,8 +300,8 @@ public class DefaultMetadataServiceTest {
Map<String, Object> values = new HashMap<>(); Map<String, Object> values = new HashMap<>();
values.put("name", "col1"); values.put("name", "col1");
values.put("type", "type"); values.put("type", "type");
Referenceable ref = new Referenceable("column_type", values); Referenceable col1 = new Referenceable("column_type", values);
columns.add(ref); columns.add(col1);
Referenceable tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{ Referenceable tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{
put("columns", columns); put("columns", columns);
}}); }});
...@@ -307,19 +311,18 @@ public class DefaultMetadataServiceTest { ...@@ -307,19 +311,18 @@ public class DefaultMetadataServiceTest {
metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, "name", (String) table.get("name")); metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, "name", (String) table.get("name"));
Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
final List<Referenceable> arrClsColumns = (List) tableDefinition.get("columns"); final List<Referenceable> arrClsColumns = (List) tableDefinition.get("columns");
Assert.assertTrue(arrClsColumns.get(0).equalsContents(columns.get(0))); assertReferenceables(arrClsColumns.get(0), columns.get(0));
//Partial update. Add col5 But also update col1 //Partial update. Add col5 But also update col1
Map<String, Object> valuesCol5 = new HashMap<>(); Map<String, Object> valuesCol5 = new HashMap<>();
valuesCol5.put("name", "col5"); valuesCol5.put("name", "col5");
valuesCol5.put("type", "type"); valuesCol5.put("type", "type");
ref = new Referenceable("column_type", valuesCol5); Referenceable col2 = new Referenceable("column_type", valuesCol5);
//update col1 //update col1
arrClsColumns.get(0).set("type", "type1"); col1.set("type", "type1");
//add col5 //add col5
final List<Referenceable> updateColumns = new ArrayList<>(arrClsColumns); final List<Referenceable> updateColumns = Arrays.asList(col1, col2);
updateColumns.add(ref);
tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{ tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{
put("columns", updateColumns); put("columns", updateColumns);
...@@ -331,8 +334,8 @@ public class DefaultMetadataServiceTest { ...@@ -331,8 +334,8 @@ public class DefaultMetadataServiceTest {
tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
List<Referenceable> arrColumnsList = (List) tableDefinition.get("columns"); List<Referenceable> arrColumnsList = (List) tableDefinition.get("columns");
Assert.assertEquals(arrColumnsList.size(), 2); Assert.assertEquals(arrColumnsList.size(), 2);
Assert.assertTrue(arrColumnsList.get(0).equalsContents(updateColumns.get(0))); assertReferenceables(arrColumnsList.get(0), updateColumns.get(0));
Assert.assertTrue(arrColumnsList.get(1).equalsContents(updateColumns.get(1))); assertReferenceables(arrColumnsList.get(1), updateColumns.get(1));
//Complete update. Add array elements - col3,4 //Complete update. Add array elements - col3,4
Map<String, Object> values1 = new HashMap<>(); Map<String, Object> values1 = new HashMap<>();
...@@ -355,9 +358,8 @@ public class DefaultMetadataServiceTest { ...@@ -355,9 +358,8 @@ public class DefaultMetadataServiceTest {
tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
arrColumnsList = (List) tableDefinition.get("columns"); arrColumnsList = (List) tableDefinition.get("columns");
Assert.assertEquals(arrColumnsList.size(), columns.size()); Assert.assertEquals(arrColumnsList.size(), columns.size());
Assert.assertTrue(arrColumnsList.get(1).equalsContents(columns.get(1))); assertReferenceables(arrColumnsList.get(1), columns.get(1));
Assert.assertTrue(arrColumnsList.get(2).equalsContents(columns.get(2))); assertReferenceables(arrColumnsList.get(2), columns.get(2));
//Remove a class reference/Id and insert another reference //Remove a class reference/Id and insert another reference
//Also covers isComposite case since columns is a composite //Also covers isComposite case since columns is a composite
...@@ -366,8 +368,8 @@ public class DefaultMetadataServiceTest { ...@@ -366,8 +368,8 @@ public class DefaultMetadataServiceTest {
values.put("name", "col2"); values.put("name", "col2");
values.put("type", "type"); values.put("type", "type");
ref = new Referenceable("column_type", values); col1 = new Referenceable("column_type", values);
columns.add(ref); columns.add(col1);
table.set("columns", columns); table.set("columns", columns);
updateInstance(table); updateInstance(table);
...@@ -376,7 +378,7 @@ public class DefaultMetadataServiceTest { ...@@ -376,7 +378,7 @@ public class DefaultMetadataServiceTest {
tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true);
arrColumnsList = (List) tableDefinition.get("columns"); arrColumnsList = (List) tableDefinition.get("columns");
Assert.assertEquals(arrColumnsList.size(), columns.size()); Assert.assertEquals(arrColumnsList.size(), columns.size());
Assert.assertTrue(arrColumnsList.get(0).equalsContents(columns.get(0))); assertReferenceables(arrColumnsList.get(0), columns.get(0));
//Update array column to null //Update array column to null
table.setNull("columns"); table.setNull("columns");
...@@ -389,6 +391,14 @@ public class DefaultMetadataServiceTest { ...@@ -389,6 +391,14 @@ public class DefaultMetadataServiceTest {
Assert.assertNull(tableDefinition.get("columns")); Assert.assertNull(tableDefinition.get("columns"));
} }
private void assertReferenceables(Referenceable r1, Referenceable r2) {
assertEquals(r1.getTypeName(), r2.getTypeName());
assertTrue(r1.getTraits().equals(r2.getTraits()));
for (String attr : r1.getValuesMap().keySet()) {
assertTrue(r1.getValuesMap().get(attr).equals(r2.getValuesMap().get(attr)));
}
//TODO assert trait instances and complex attributes
}
@Test @Test
public void testStructs() throws Exception { public void testStructs() throws Exception {
......
...@@ -33,7 +33,7 @@ import java.util.Map; ...@@ -33,7 +33,7 @@ import java.util.Map;
*/ */
public class Referenceable extends Struct implements IReferenceableInstance { public class Referenceable extends Struct implements IReferenceableInstance {
private final Id id; private Id id;
private final ImmutableMap<String, IStruct> traits; private final ImmutableMap<String, IStruct> traits;
private final ImmutableList<String> traitNames; private final ImmutableList<String> traitNames;
...@@ -151,6 +151,10 @@ public class Referenceable extends Struct implements IReferenceableInstance { ...@@ -151,6 +151,10 @@ public class Referenceable extends Struct implements IReferenceableInstance {
'}'; '}';
} }
public void replaceWithNewId(Id id) {
this.id = id;
}
private static Map<String, IStruct> getTraits(IReferenceableInstance instance) throws AtlasException { private static Map<String, IStruct> getTraits(IReferenceableInstance instance) throws AtlasException {
Map<String, IStruct> traits = new HashMap<>(); Map<String, IStruct> traits = new HashMap<>();
for (String traitName : instance.getTraits() ) { for (String traitName : instance.getTraits() ) {
......
...@@ -125,9 +125,9 @@ public class ClassType extends HierarchicalType<ClassType, IReferenceableInstanc ...@@ -125,9 +125,9 @@ public class ClassType extends HierarchicalType<ClassType, IReferenceableInstanc
r != null ? createInstanceWithTraits(id, r, r.getTraits().toArray(new String[0])) : r != null ? createInstanceWithTraits(id, r, r.getTraits().toArray(new String[0])) :
createInstance(id); createInstance(id);
// if (id != null && id.isAssigned()) { if (id != null && id.isAssigned()) {
// return tr; return tr;
// } }
for (Map.Entry<String, AttributeInfo> e : fieldMapping.fields.entrySet()) { for (Map.Entry<String, AttributeInfo> e : fieldMapping.fields.entrySet()) {
String attrKey = e.getKey(); String attrKey = e.getKey();
......
...@@ -25,8 +25,8 @@ public final class Multiplicity { ...@@ -25,8 +25,8 @@ public final class Multiplicity {
public static final Multiplicity OPTIONAL = new Multiplicity(0, 1, false); public static final Multiplicity OPTIONAL = new Multiplicity(0, 1, false);
public static final Multiplicity REQUIRED = new Multiplicity(1, 1, false); public static final Multiplicity REQUIRED = new Multiplicity(1, 1, false);
public static final Multiplicity COLLECTION = new Multiplicity(0, Integer.MAX_VALUE, false); public static final Multiplicity COLLECTION = new Multiplicity(1, Integer.MAX_VALUE, false);
public static final Multiplicity SET = new Multiplicity(0, Integer.MAX_VALUE, true); public static final Multiplicity SET = new Multiplicity(1, Integer.MAX_VALUE, true);
public final int lower; public final int lower;
public final int upper; public final int upper;
......
...@@ -27,7 +27,7 @@ atlas.graph.storage.backend=${titan.storage.backend} ...@@ -27,7 +27,7 @@ atlas.graph.storage.backend=${titan.storage.backend}
atlas.graph.index.search.backend=${titan.index.backend} atlas.graph.index.search.backend=${titan.index.backend}
#Berkeley storage directory #Berkeley storage directory
atlas.graph.storage.directory=target/data/berkley atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
...@@ -38,7 +38,7 @@ atlas.graph.storage.hbase.regions-per-server=1 ...@@ -38,7 +38,7 @@ atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000 atlas.graph.storage.lock.wait-time=10000
#ElasticSearch #ElasticSearch
atlas.graph.index.search.directory=target/data/es atlas.graph.index.search.directory=${sys:atlas.data}/es
atlas.graph.index.search.elasticsearch.client-only=false atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000 atlas.graph.index.search.elasticsearch.create.sleep=2000
...@@ -63,7 +63,7 @@ atlas.notification.embedded=true ...@@ -63,7 +63,7 @@ atlas.notification.embedded=true
atlas.kafka.zookeeper.connect=localhost:19026 atlas.kafka.zookeeper.connect=localhost:19026
atlas.kafka.bootstrap.servers=localhost:19027 atlas.kafka.bootstrap.servers=localhost:19027
atlas.kafka.data=target/data/kafka atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=400 atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=100 atlas.kafka.auto.commit.interval.ms=100
......
...@@ -372,6 +372,10 @@ ...@@ -372,6 +372,10 @@
<name>atlas.home</name> <name>atlas.home</name>
<value>${project.build.directory}</value> <value>${project.build.directory}</value>
</systemProperty> </systemProperty>
<systemProperty>
<name>atlas.data</name>
<value>${project.build.directory}/data</value>
</systemProperty>
</systemProperties> </systemProperties>
<stopKey>atlas-stop</stopKey> <stopKey>atlas-stop</stopKey>
<stopPort>31001</stopPort> <stopPort>31001</stopPort>
......
...@@ -40,6 +40,7 @@ public final class Atlas { ...@@ -40,6 +40,7 @@ public final class Atlas {
private static final String APP_PATH = "app"; private static final String APP_PATH = "app";
private static final String APP_PORT = "port"; private static final String APP_PORT = "port";
private static final String ATLAS_HOME = "atlas.home"; private static final String ATLAS_HOME = "atlas.home";
private static final String ATLAS_DATA = "atlas.data";
private static final String ATLAS_LOG_DIR = "atlas.log.dir"; private static final String ATLAS_LOG_DIR = "atlas.log.dir";
public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port"; public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port";
public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port"; public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port";
...@@ -110,6 +111,9 @@ public final class Atlas { ...@@ -110,6 +111,9 @@ public final class Atlas {
if (System.getProperty(ATLAS_HOME) == null) { if (System.getProperty(ATLAS_HOME) == null) {
System.setProperty(ATLAS_HOME, "target"); System.setProperty(ATLAS_HOME, "target");
} }
if (System.getProperty(ATLAS_DATA) == null) {
System.setProperty(ATLAS_DATA, "target/data");
}
if (System.getProperty(ATLAS_LOG_DIR) == null) { if (System.getProperty(ATLAS_LOG_DIR) == null) {
System.setProperty(ATLAS_LOG_DIR, "target/logs"); System.setProperty(ATLAS_LOG_DIR, "target/logs");
} }
......
...@@ -19,22 +19,22 @@ ...@@ -19,22 +19,22 @@
package org.apache.atlas.notification; package org.apache.atlas.notification;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.web.resources.BaseResourceIT; import org.apache.atlas.web.resources.BaseResourceIT;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.testng.Assert;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice; import org.testng.annotations.Guice;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@Guice(modules = NotificationModule.class) @Guice(modules = NotificationModule.class)
public class NotificationHookConsumerIT extends BaseResourceIT { public class NotificationHookConsumerIT extends BaseResourceIT {
@Inject @Inject
private NotificationInterface kafka; private NotificationInterface kafka;
private String dbName;
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -47,57 +47,106 @@ public class NotificationHookConsumerIT extends BaseResourceIT { ...@@ -47,57 +47,106 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
kafka.close(); kafka.close();
} }
private void sendHookMessage(Referenceable entity) throws NotificationException { private void sendHookMessage(HookNotification.HookNotificationMessage message) throws NotificationException {
String entityJson = InstanceSerialization.toJson(entity, true); kafka.send(NotificationInterface.NotificationType.HOOK, message);
JSONArray jsonArray = new JSONArray();
jsonArray.put(entityJson);
kafka.send(NotificationInterface.NotificationType.HOOK, jsonArray.toString());
} }
@Test @Test
public void testConsumeHookMessage() throws Exception { public void testCreateEntity() throws Exception {
Referenceable entity = new Referenceable(DATABASE_TYPE); final Referenceable entity = new Referenceable(DATABASE_TYPE);
dbName = "db" + randomString(); entity.set("name", "db" + randomString());
entity.set("name", dbName);
entity.set("description", randomString()); entity.set("description", randomString());
sendHookMessage(entity); sendHookMessage(new HookNotification.EntityCreateRequest(entity));
waitFor(1000, new Predicate() { waitFor(1000, new Predicate() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
JSONArray results = JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); entity.get("name")));
return results.length() == 1; return results.length() == 1;
} }
}); });
} }
@Test (dependsOnMethods = "testConsumeHookMessage") @Test
public void testEnityDeduping() throws Exception { public void testUpdateEntityPartial() throws Exception {
// Referenceable db = serviceClient.getEntity(DATABASE_TYPE, "name", dbName); final Referenceable entity = new Referenceable(DATABASE_TYPE);
Referenceable db = new Referenceable(DATABASE_TYPE); final String dbName = "db" + randomString();
db.set("name", dbName); entity.set("name", dbName);
db.set("description", randomString()); entity.set("description", randomString());
serviceClient.createEntity(entity);
Referenceable table = new Referenceable(HIVE_TABLE_TYPE); final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
final String tableName = randomString(); newEntity.set("owner", randomString());
table.set("name", tableName); sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
table.set("db", db); waitFor(1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
Referenceable localEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner")));
}
});
//Its partial update and un-set fields are not updated
Referenceable actualEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
assertEquals(actualEntity.get("description"), entity.get("description"));
}
@Test
public void testUpdatePartialUpdatingQualifiedName() throws Exception {
final Referenceable entity = new Referenceable(DATABASE_TYPE);
final String dbName = "db" + randomString();
entity.set("name", dbName);
entity.set("description", randomString());
serviceClient.createEntity(entity);
sendHookMessage(table); final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
final String newName = "db" + randomString();
newEntity.set("name", newName);
sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity));
waitFor(1000, new Predicate() { waitFor(1000, new Predicate() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
JSONArray results = JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
serviceClient.searchByDSL(String.format("%s where name='%s'", HIVE_TABLE_TYPE, tableName)); newName));
return results.length() == 1; return results.length() == 1;
} }
}); });
JSONArray results = //no entity with the old qualified name
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
Assert.assertEquals(results.length(), 1); assertEquals(results.length(), 0);
} }
@Test
public void testUpdateEntityFullUpdate() throws Exception {
Referenceable entity = new Referenceable(DATABASE_TYPE);
final String dbName = "db" + randomString();
entity.set("name", dbName);
entity.set("description", randomString());
serviceClient.createEntity(entity);
final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
newEntity.set("name", dbName);
newEntity.set("description", randomString());
newEntity.set("owner", randomString());
//updating unique attribute
sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity));
waitFor(1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
dbName));
return results.length() == 1;
}
});
Referenceable actualEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName);
assertEquals(actualEntity.get("description"), newEntity.get("description"));
assertEquals(actualEntity.get("owner"), newEntity.get("owner"));
}
} }
...@@ -58,6 +58,8 @@ import java.util.List; ...@@ -58,6 +58,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import static org.testng.Assert.assertEquals;
/** /**
* Integration tests for Entity Jersey Resource. * Integration tests for Entity Jersey Resource.
*/ */
...@@ -95,6 +97,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -95,6 +97,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
} }
@Test @Test
//API should accept single entity (or jsonarray of entities)
public void testSubmitSingleEntity() throws Exception { public void testSubmitSingleEntity() throws Exception {
Referenceable databaseInstance = new Referenceable(DATABASE_TYPE); Referenceable databaseInstance = new Referenceable(DATABASE_TYPE);
databaseInstance.set("name", randomString()); databaseInstance.set("name", randomString());
...@@ -115,6 +118,34 @@ public class EntityJerseyResourceIT extends BaseResourceIT { ...@@ -115,6 +118,34 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
} }
@Test @Test
public void testEntityDeduping() throws Exception {
Referenceable db = new Referenceable(DATABASE_TYPE);
String dbName = "db" + randomString();
db.set("name", dbName);
db.set("description", randomString());
serviceClient.createEntity(db);
JSONArray results =
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
serviceClient.createEntity(db);
results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
//Test the same across references
Referenceable table = new Referenceable(HIVE_TABLE_TYPE);
final String tableName = randomString();
table.set("name", tableName);
table.set("db", db);
serviceClient.createEntity(table);
results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
}
@Test
public void testEntityDefinitionAcrossTypeUpdate() throws Exception { public void testEntityDefinitionAcrossTypeUpdate() throws Exception {
//create type //create type
HierarchicalTypeDefinition<ClassType> typeDefinition = TypesUtil HierarchicalTypeDefinition<ClassType> typeDefinition = TypesUtil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment