Commit 8f9dec9d by Suma Shivaprasad

Initial draft for listener injection improvements

parent 92afb649
...@@ -328,6 +328,12 @@ ...@@ -328,6 +328,12 @@
<dependency> <dependency>
<groupId>com.google.inject.extensions</groupId> <groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId> <artifactId>guice-servlet</artifactId>
<version>3.0</version> <version>3.0</version>
</dependency> </dependency>
......
...@@ -65,6 +65,11 @@ ...@@ -65,6 +65,11 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId> <groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId> <artifactId>jettison</artifactId>
</dependency> </dependency>
......
...@@ -18,7 +18,10 @@ ...@@ -18,7 +18,10 @@
package org.apache.atlas; package org.apache.atlas;
import com.google.inject.Provider;
import com.google.inject.TypeLiteral;
import com.google.inject.matcher.Matchers; import com.google.inject.matcher.Matchers;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.throwingproviders.ThrowingProviderBinder; import com.google.inject.throwingproviders.ThrowingProviderBinder;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInterceptor;
...@@ -27,6 +30,7 @@ import org.apache.atlas.discovery.HiveLineageService; ...@@ -27,6 +30,7 @@ import org.apache.atlas.discovery.HiveLineageService;
import org.apache.atlas.discovery.LineageService; import org.apache.atlas.discovery.LineageService;
import org.apache.atlas.discovery.SearchIndexer; import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.MetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedMetadataRepository; import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
...@@ -41,6 +45,7 @@ import org.apache.atlas.services.MetadataService; ...@@ -41,6 +45,7 @@ import org.apache.atlas.services.MetadataService;
* Guice module for Repository module. * Guice module for Repository module.
*/ */
public class RepositoryMetadataModule extends com.google.inject.AbstractModule { public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
private Multibinder<Provider<TypesChangeListener>> typesChangeListenerBinder;
@Override @Override
protected void configure() { protected void configure() {
...@@ -61,14 +66,17 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { ...@@ -61,14 +66,17 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
// bind the GraphService interface to an implementation // bind the GraphService interface to an implementation
// bind(GraphService.class).to(graphServiceClass); // bind(GraphService.class).to(graphServiceClass);
typesChangeListenerBinder = Multibinder.newSetBinder(binder(), new TypeLiteral<TypesChangeListener>());
typesChangeListenerBinder.addBinding().to(GraphBackedSearchIndexer.class);
bind(TypesChangeListener.class).to(GraphBackedSearchIndexer.class);
// bind the MetadataService interface to an implementation // bind the MetadataService interface to an implementation
bind(MetadataService.class).to(DefaultMetadataService.class).asEagerSingleton(); bind(MetadataService.class).to(DefaultMetadataService.class).asEagerSingleton();
// bind the DiscoveryService interface to an implementation // bind the DiscoveryService interface to an implementation
bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton(); bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
bind(SearchIndexer.class).to(GraphBackedSearchIndexer.class);
bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton(); bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
MethodInterceptor interceptor = new GraphTransactionInterceptor(); MethodInterceptor interceptor = new GraphTransactionInterceptor();
......
...@@ -21,19 +21,20 @@ package org.apache.atlas.listener; ...@@ -21,19 +21,20 @@ package org.apache.atlas.listener;
import org.apache.atlas.MetadataException; import org.apache.atlas.MetadataException;
import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.IDataType;
import java.util.Collection;
/** /**
* Types change notification listener. * Types change notification listener.
*/ */
public interface TypesChangeListener { public interface TypesChangeListener {
/** /**
* This is upon adding a new type to Store. * This is upon adding new type(s) to Store.
* *
* @param typeName type name * @param dataTypes data type
* @param dataType data type
* @throws MetadataException * @throws MetadataException
*/ */
void onAdd(String typeName, IDataType dataType) throws MetadataException; void onAdd(Collection<? extends IDataType> dataTypes) throws MetadataException;
/** /**
* This is upon removing an existing type from the Store. * This is upon removing an existing type from the Store.
......
...@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; ...@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject; import javax.inject.Inject;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
...@@ -141,21 +142,21 @@ public class GraphBackedSearchIndexer implements SearchIndexer { ...@@ -141,21 +142,21 @@ public class GraphBackedSearchIndexer implements SearchIndexer {
/** /**
* This is upon adding a new type to Store. * This is upon adding a new type to Store.
* *
* @param typeName type name * @param dataTypes data type
* @param dataType data type
* @throws org.apache.atlas.MetadataException * @throws org.apache.atlas.MetadataException
*/ */
@Override @Override
public void onAdd(String typeName, IDataType dataType) throws MetadataException { public void onAdd(Collection<? extends IDataType> dataTypes) throws MetadataException {
LOG.info("Creating indexes for type name={}, definition={}", typeName, dataType);
for(IDataType dataType : dataTypes) {
try { LOG.info("Creating indexes for type name={}, definition={}", dataType.getName(), dataType.getClass());
addIndexForType(dataType); try {
LOG.info("Index creation for type {} complete", typeName); addIndexForType(dataType);
LOG.info("Index creation for type {} complete", dataType.getName());
} catch (Throwable throwable) { } catch (Throwable throwable) {
LOG.error("Error creating index for type {}", dataType, throwable); LOG.error("Error creating index for type {}", dataType, throwable);
throw new IndexCreationException("Error while creating index for type " + dataType, throwable); throw new IndexCreationException("Error while creating index for type " + dataType, throwable);
}
} }
} }
......
...@@ -56,9 +56,11 @@ import org.codehaus.jettison.json.JSONException; ...@@ -56,9 +56,11 @@ import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.actors.threadpool.Arrays;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import javax.management.ListenerNotFoundException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
...@@ -77,22 +79,22 @@ public class DefaultMetadataService implements MetadataService { ...@@ -77,22 +79,22 @@ public class DefaultMetadataService implements MetadataService {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(DefaultMetadataService.class); LoggerFactory.getLogger(DefaultMetadataService.class);
private final Set<EntityChangeListener> entityChangeListeners private final Collection<EntityChangeListener> entityChangeListeners
= new LinkedHashSet<>(); = new LinkedHashSet<>();
private final TypeSystem typeSystem; private final TypeSystem typeSystem;
private final MetadataRepository repository; private final MetadataRepository repository;
private final ITypeStore typeStore; private final ITypeStore typeStore;
private final Set<Provider<SearchIndexer>> typeChangeListeners; private final Collection<Provider<TypesChangeListener>> typeChangeListeners;
@Inject @Inject
DefaultMetadataService(final MetadataRepository repository, DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore,
final Provider<SearchIndexer> searchIndexProvider, final ITypeStore typeStore) throws MetadataException { final Collection<Provider<TypesChangeListener>> typeChangeListeners) throws MetadataException {
this.typeStore = typeStore; this.typeStore = typeStore;
this.typeSystem = TypeSystem.getInstance(); this.typeSystem = TypeSystem.getInstance();
this.repository = repository; this.repository = repository;
this.typeChangeListeners = new LinkedHashSet<Provider<SearchIndexer>>() {{ add(searchIndexProvider); }}; this.typeChangeListeners = typeChangeListeners;
restoreTypeSystem(); restoreTypeSystem();
} }
...@@ -176,7 +178,7 @@ public class DefaultMetadataService implements MetadataService { ...@@ -176,7 +178,7 @@ public class DefaultMetadataService implements MetadataService {
/* Create indexes first so that if index creation fails then we rollback /* Create indexes first so that if index creation fails then we rollback
the typesystem and also do not persist the graph the typesystem and also do not persist the graph
*/ */
onTypesAddedToRepo(typesAdded); onTypesAdded(typesAdded);
typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet())); typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet()));
} catch (Throwable t) { } catch (Throwable t) {
typeSystem.removeTypes(typesAdded.keySet()); typeSystem.removeTypes(typesAdded.keySet());
...@@ -396,19 +398,15 @@ public class DefaultMetadataService implements MetadataService { ...@@ -396,19 +398,15 @@ public class DefaultMetadataService implements MetadataService {
onTraitDeletedFromEntity(guid, traitNameToBeDeleted); onTraitDeletedFromEntity(guid, traitNameToBeDeleted);
} }
private void onTypesAddedToRepo(Map<String, IDataType> typesAdded) throws MetadataException { private void onTypesAdded(Map<String, IDataType> typesAdded) throws MetadataException {
Map<SearchIndexer, Throwable> caughtExceptions = new HashMap<>(); Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
for(Provider<SearchIndexer> indexerProvider : typeChangeListeners) { for(Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
final SearchIndexer indexer = indexerProvider.get(); final TypesChangeListener listener = indexerProvider.get();
try { try {
for (Map.Entry<String, IDataType> entry : typesAdded.entrySet()) { listener.onAdd(typesAdded.values());
indexer.onAdd(entry.getKey(), entry.getValue());
}
indexer.commit();
} catch (IndexCreationException ice) { } catch (IndexCreationException ice) {
LOG.error("Index creation for listener {} failed ", indexerProvider, ice); LOG.error("Index creation for listener {} failed ", indexerProvider, ice);
indexer.rollback(); caughtExceptions.put(listener, ice);
caughtExceptions.put(indexer, ice);
} }
} }
......
...@@ -222,8 +222,8 @@ public class GraphRepoMapperScaleTest { ...@@ -222,8 +222,8 @@ public class GraphRepoMapperScaleTest {
}; };
EnumTypeDefinition enumTypeDefinition = new EnumTypeDefinition("table_type", values); EnumTypeDefinition enumTypeDefinition = new EnumTypeDefinition("table_type", values);
EnumType enumType = typeSystem.defineEnumType(enumTypeDefinition); final EnumType enumType = typeSystem.defineEnumType(enumTypeDefinition);
searchIndexer.onAdd("table_type", enumType); searchIndexer.onAdd(new ArrayList<IDataType>() {{ add(enumType); }});
HierarchicalTypeDefinition<ClassType> columnsDefinition = HierarchicalTypeDefinition<ClassType> columnsDefinition =
TypesUtil.createClassTypeDef("hive_column_type", TypesUtil.createClassTypeDef("hive_column_type",
...@@ -275,8 +275,8 @@ public class GraphRepoMapperScaleTest { ...@@ -275,8 +275,8 @@ public class GraphRepoMapperScaleTest {
ImmutableList.of(classificationTypeDefinition), ImmutableList.of(classificationTypeDefinition),
ImmutableList.of(databaseTypeDefinition, columnsDefinition, tableTypeDefinition)); ImmutableList.of(databaseTypeDefinition, columnsDefinition, tableTypeDefinition));
for (Map.Entry<String, IDataType> entry : types.entrySet()) { for (final Map.Entry<String, IDataType> entry : types.entrySet()) {
searchIndexer.onAdd(entry.getKey(), entry.getValue()); searchIndexer.onAdd(new ArrayList<IDataType>() {{ add(entry.getValue()); }});
} }
searchIndexer.commit(); searchIndexer.commit();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment