Commit fdd841dd by Shwetha G S

Merge pull request #144 from sumashivaprasad/api_merged

Improvements for type change listener injection
parents 3e1637a4 d75aeb5d
......@@ -90,7 +90,7 @@
<log4j.version>1.2.17</log4j.version>
<akka.version>2.3.7</akka.version>
<spray.version>1.3.1</spray.version>
<guava.version>11.0.2</guava.version>
<guava.version>14.0</guava.version>
<fastutil.version>6.5.16</fastutil.version>
<PermGen>64m</PermGen>
......@@ -317,19 +317,25 @@
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
<version>4.0</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-throwingproviders</artifactId>
<version>3.0</version>
<version>4.0</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<version>4.0</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>3.0</version>
<version>4.0</version>
</dependency>
<dependency>
......
......@@ -65,6 +65,11 @@
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
</dependency>
......
......@@ -18,7 +18,10 @@
package org.apache.atlas;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.throwingproviders.ThrowingProviderBinder;
import com.thinkaurelius.titan.core.TitanGraph;
import org.aopalliance.intercept.MethodInterceptor;
......@@ -27,6 +30,7 @@ import org.apache.atlas.discovery.HiveLineageService;
import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
......@@ -58,8 +62,8 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the ITypeStore interface to an implementation
bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton();
// bind the GraphService interface to an implementation
// bind(GraphService.class).to(graphServiceClass);
Multibinder<TypesChangeListener> typesChangeListenerBinder = Multibinder.newSetBinder(binder(), TypesChangeListener.class);
typesChangeListenerBinder.addBinding().to(GraphBackedSearchIndexer.class);
// bind the MetadataService interface to an implementation
bind(MetadataService.class).to(DefaultMetadataService.class).asEagerSingleton();
......@@ -67,8 +71,6 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the DiscoveryService interface to an implementation
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
bind(SearchIndexer.class).to(GraphBackedSearchIndexer.class);
bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
MethodInterceptor interceptor = new GraphTransactionInterceptor();
......
......@@ -26,12 +26,4 @@ import org.apache.atlas.repository.IndexException;
* Interface for indexing types.
*/
public interface SearchIndexer extends TypesChangeListener {
/* Commit the indexes */
void commit() throws IndexException;
/* RollBack the index */
void rollback() throws IndexException;
}
......@@ -21,19 +21,20 @@ package org.apache.atlas.listener;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.types.IDataType;
import java.util.Collection;
/**
* Types change notification listener.
*/
public interface TypesChangeListener {
/**
* This is upon adding a new type to Store.
* This is upon adding new type(s) to Store.
*
* @param typeName type name
* @param dataType data type
* @param dataTypes data type
* @throws AtlasException
*/
void onAdd(String typeName, IDataType dataType) throws AtlasException;
void onAdd(Collection<? extends IDataType> dataTypes) throws AtlasException;
/**
* This is upon removing an existing type from the Store.
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.repository.graph;
import com.google.inject.Provider;
import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.TitanGraph;
......@@ -28,6 +29,7 @@ import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.AtlasException;
import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexCreationException;
import org.apache.atlas.repository.IndexException;
......@@ -44,6 +46,7 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
......@@ -141,22 +144,26 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
/**
* This is upon adding a new type to Store.
*
* @param typeName type name
* @param dataType data type
* @throws AtlasException
* @param dataTypes data type
* @throws org.apache.atlas.AtlasException
*/
@Override
public void onAdd(String typeName, IDataType dataType) throws AtlasException {
LOG.info("Creating indexes for type name={}, definition={}", typeName, dataType);
try {
addIndexForType(dataType);
LOG.info("Index creation for type {} complete", typeName);
} catch (Throwable throwable) {
LOG.error("Error creating index for type {}", dataType, throwable);
throw new IndexCreationException("Error while creating index for type " + dataType, throwable);
public void onAdd(Collection<? extends IDataType> dataTypes) throws AtlasException {
for(IDataType dataType : dataTypes) {
LOG.info("Creating indexes for type name={}, definition={}", dataType.getName(), dataType.getClass());
try {
addIndexForType(dataType);
LOG.info("Index creation for type {} complete", dataType.getName());
} catch (Throwable throwable) {
LOG.error("Error creating index for type {}", dataType, throwable);
//Rollback indexes if any failure
rollback();
throw new IndexCreationException("Error while creating index for type " + dataType, throwable);
}
}
//Commit indexes
commit();
}
private void addIndexForType(IDataType dataType) {
......@@ -347,7 +354,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
return true;
}
@Override
public void commit() throws IndexException {
try {
management.commit();
......@@ -357,7 +363,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
}
}
@Override
public void rollback() throws IndexException {
try {
management.rollback();
......
......@@ -28,6 +28,7 @@ import org.apache.atlas.TypeNotFoundException;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.IndexCreationException;
import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.typestore.ITypeStore;
......@@ -54,9 +55,12 @@ import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.actors.threadpool.Arrays;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
......@@ -73,23 +77,22 @@ public class DefaultMetadataService implements MetadataService {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultMetadataService.class);
private final Set<EntityChangeListener> entityChangeListeners
private final Collection<EntityChangeListener> entityChangeListeners
= new LinkedHashSet<>();
private final TypeSystem typeSystem;
private final MetadataRepository repository;
private final ITypeStore typeStore;
private final Set<Provider<SearchIndexer>> typeChangeListeners;
private final Collection<Provider<TypesChangeListener>> typeChangeListeners;
@Inject
DefaultMetadataService(final MetadataRepository repository,
final Provider<SearchIndexer> searchIndexProvider, final ITypeStore typeStore) throws
AtlasException {
DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final Collection<Provider<TypesChangeListener>> typeChangeListeners) throws AtlasException {
this.typeStore = typeStore;
this.typeSystem = TypeSystem.getInstance();
this.repository = repository;
this.typeChangeListeners = new LinkedHashSet<Provider<SearchIndexer>>() {{ add(searchIndexProvider); }};
this.typeChangeListeners = typeChangeListeners;
restoreTypeSystem();
}
......@@ -173,7 +176,7 @@ public class DefaultMetadataService implements MetadataService {
/* Create indexes first so that if index creation fails then we rollback
the typesystem and also do not persist the graph
*/
onTypesAddedToRepo(typesAdded);
onTypesAdded(typesAdded);
typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet()));
} catch (Throwable t) {
typeSystem.removeTypes(typesAdded.keySet());
......@@ -393,19 +396,15 @@ public class DefaultMetadataService implements MetadataService {
onTraitDeletedFromEntity(guid, traitNameToBeDeleted);
}
private void onTypesAddedToRepo(Map<String, IDataType> typesAdded) throws AtlasException {
Map<SearchIndexer, Throwable> caughtExceptions = new HashMap<>();
for(Provider<SearchIndexer> indexerProvider : typeChangeListeners) {
final SearchIndexer indexer = indexerProvider.get();
private void onTypesAdded(Map<String, IDataType> typesAdded) throws AtlasException {
Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
for(Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
final TypesChangeListener listener = indexerProvider.get();
try {
for (Map.Entry<String, IDataType> entry : typesAdded.entrySet()) {
indexer.onAdd(entry.getKey(), entry.getValue());
}
indexer.commit();
listener.onAdd(typesAdded.values());
} catch (IndexCreationException ice) {
LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
indexer.rollback();
caughtExceptions.put(indexer, ice);
caughtExceptions.put(listener, ice);
}
}
......
......@@ -222,8 +222,7 @@ public class GraphRepoMapperScaleTest {
};
EnumTypeDefinition enumTypeDefinition = new EnumTypeDefinition("table_type", values);
EnumType enumType = typeSystem.defineEnumType(enumTypeDefinition);
searchIndexer.onAdd("table_type", enumType);
final EnumType enumType = typeSystem.defineEnumType(enumTypeDefinition);
HierarchicalTypeDefinition<ClassType> columnsDefinition =
TypesUtil.createClassTypeDef("hive_column_type",
......@@ -275,10 +274,11 @@ public class GraphRepoMapperScaleTest {
ImmutableList.of(classificationTypeDefinition),
ImmutableList.of(databaseTypeDefinition, columnsDefinition, tableTypeDefinition));
for (Map.Entry<String, IDataType> entry : types.entrySet()) {
searchIndexer.onAdd(entry.getKey(), entry.getValue());
}
searchIndexer.commit();
ArrayList<IDataType> typesAdded = new ArrayList<IDataType>();
typesAdded.add(enumType);
typesAdded.addAll(types.values());
searchIndexer.onAdd(typesAdded);
}
private ITypedReferenceableInstance createHiveTableInstance(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment