Commit d519ae8c by Shwetha GS

ATLAS-433 Fix checkstyle issues for common and notification module (shwethags)

parent fd070cb8
...@@ -31,6 +31,10 @@ ...@@ -31,6 +31,10 @@
<name>Apache Atlas Common</name> <name>Apache Atlas Common</name>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties>
<checkstyle.failOnViolation>true</checkstyle.failOnViolation>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.testng</groupId> <groupId>org.testng</groupId>
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas; package org.apache.atlas;
import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.CompositeConfiguration;
...@@ -29,29 +29,32 @@ import java.net.URL; ...@@ -29,29 +29,32 @@ import java.net.URL;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
public class ApplicationProperties extends PropertiesConfiguration { /**
* Application properties used by Atlas.
*/
public final class ApplicationProperties extends PropertiesConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationProperties.class); private static final Logger LOG = LoggerFactory.getLogger(ApplicationProperties.class);
public static final String APPLICATION_PROPERTIES = "atlas-application.properties"; public static final String APPLICATION_PROPERTIES = "atlas-application.properties";
public static final String CLIENT_PROPERTIES = "client.properties"; public static final String CLIENT_PROPERTIES = "client.properties";
private static Configuration INSTANCE = null; private static Configuration instance = null;
private ApplicationProperties(URL url) throws ConfigurationException { private ApplicationProperties(URL url) throws ConfigurationException {
super(url); super(url);
} }
public static Configuration get() throws AtlasException { public static Configuration get() throws AtlasException {
if (INSTANCE == null) { if (instance == null) {
synchronized (ApplicationProperties.class) { synchronized (ApplicationProperties.class) {
if (INSTANCE == null) { if (instance == null) {
Configuration applicationProperties = get(APPLICATION_PROPERTIES); Configuration applicationProperties = get(APPLICATION_PROPERTIES);
Configuration clientProperties = get(CLIENT_PROPERTIES); Configuration clientProperties = get(CLIENT_PROPERTIES);
INSTANCE = new CompositeConfiguration(Arrays.asList(applicationProperties, clientProperties)); instance = new CompositeConfiguration(Arrays.asList(applicationProperties, clientProperties));
} }
} }
} }
return INSTANCE; return instance;
} }
public static Configuration get(String fileName) throws AtlasException { public static Configuration get(String fileName) throws AtlasException {
...@@ -80,7 +83,7 @@ public class ApplicationProperties extends PropertiesConfiguration { ...@@ -80,7 +83,7 @@ public class ApplicationProperties extends PropertiesConfiguration {
} }
} }
public static final Configuration getSubsetConfiguration(Configuration inConf, String prefix) { public static Configuration getSubsetConfiguration(Configuration inConf, String prefix) {
return inConf.subset(prefix); return inConf.subset(prefix);
} }
} }
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p/> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p/> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -18,9 +18,14 @@ ...@@ -18,9 +18,14 @@
package org.apache.atlas; package org.apache.atlas;
public interface AtlasConstants { /**
String CLUSTER_NAME_KEY = "atlas.cluster.name"; * Constants used in Atlas configuration.
String DEFAULT_CLUSTER_NAME = "primary"; */
String CLUSTER_NAME_ATTRIBUTE = "clusterName"; public final class AtlasConstants {
private AtlasConstants() {
}
public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName";
} }
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas; package org.apache.atlas;
/** /**
......
...@@ -6,22 +6,21 @@ ...@@ -6,22 +6,21 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p/> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p/> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.service; package org.apache.atlas.service;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
/** /**
* Service interface to start any background jobs * Service interface to start any background jobs.
*/ */
public interface Service { public interface Service {
......
...@@ -6,16 +6,15 @@ ...@@ -6,16 +6,15 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p/> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p/> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.service; package org.apache.atlas.service;
import com.google.inject.Inject; import com.google.inject.Inject;
...@@ -26,7 +25,7 @@ import org.slf4j.LoggerFactory; ...@@ -26,7 +25,7 @@ import org.slf4j.LoggerFactory;
import java.util.Set; import java.util.Set;
/** /**
* Utility for starting and stopping all services * Utility for starting and stopping all services.
*/ */
@Singleton @Singleton
public class Services { public class Services {
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -19,7 +20,13 @@ package org.apache.atlas.utils; ...@@ -19,7 +20,13 @@ package org.apache.atlas.utils;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
public class MD5Utils { /**
* Utilities for MD5 hash.
*/
public final class MD5Utils {
private MD5Utils() {
}
private static final ThreadLocal<MessageDigest> DIGESTER_FACTORY = private static final ThreadLocal<MessageDigest> DIGESTER_FACTORY =
new ThreadLocal<MessageDigest>() { new ThreadLocal<MessageDigest>() {
...@@ -34,7 +41,7 @@ public class MD5Utils { ...@@ -34,7 +41,7 @@ public class MD5Utils {
}; };
/** /**
* Create a thread local MD5 digester * Create a thread local MD5 digester.
*/ */
public static MessageDigest getDigester() { public static MessageDigest getDigester() {
MessageDigest digester = DIGESTER_FACTORY.get(); MessageDigest digester = DIGESTER_FACTORY.get();
...@@ -43,10 +50,10 @@ public class MD5Utils { ...@@ -43,10 +50,10 @@ public class MD5Utils {
} }
private static final char[] HEX_DIGITS = private static final char[] HEX_DIGITS =
{'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'}; {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
public static String toString(byte[] digest) { public static String toString(byte[] digest) {
StringBuilder buf = new StringBuilder(MD5_LEN*2); StringBuilder buf = new StringBuilder(MD5_LEN * 2);
for (int i = 0; i < MD5_LEN; i++) { for (int i = 0; i < MD5_LEN; i++) {
int b = digest[i]; int b = digest[i];
buf.append(HEX_DIGITS[(b >> 4) & 0xf]); buf.append(HEX_DIGITS[(b >> 4) & 0xf]);
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,13 +15,18 @@ ...@@ -14,13 +15,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.utils; package org.apache.atlas.utils;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
public class ParamChecker { /**
* Utilities for checking parameters.
*/
public final class ParamChecker {
private ParamChecker() {
}
/** /**
* Check that a value is not null. If null throws an IllegalArgumentException. * Check that a value is not null. If null throws an IllegalArgumentException.
......
...@@ -31,6 +31,10 @@ ...@@ -31,6 +31,10 @@
<name>Apache Atlas Notification</name> <name>Apache Atlas Notification</name>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties>
<checkstyle.failOnViolation>true</checkstyle.failOnViolation>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.atlas</groupId> <groupId>org.apache.atlas</groupId>
......
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p/> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p/> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -116,7 +116,7 @@ public abstract class AtlasHook { ...@@ -116,7 +116,7 @@ public abstract class AtlasHook {
return; return;
} catch(Exception e) { } catch(Exception e) {
numRetries++; numRetries++;
if(numRetries < maxRetries) { if (numRetries < maxRetries) {
LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e); LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
} else { } else {
LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting",
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.kafka; package org.apache.atlas.kafka;
import kafka.consumer.ConsumerIterator; import kafka.consumer.ConsumerIterator;
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.kafka; package org.apache.atlas.kafka;
import com.google.inject.Singleton; import com.google.inject.Singleton;
...@@ -57,10 +57,10 @@ import java.util.Map; ...@@ -57,10 +57,10 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@Singleton
/** /**
* Kafka specific access point to the Atlas notification framework. * Kafka specific access point to the Atlas notification framework.
*/ */
@Singleton
public class KafkaNotification extends AbstractNotification implements Service { public class KafkaNotification extends AbstractNotification implements Service {
public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class); public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
...@@ -70,7 +70,6 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -70,7 +70,6 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK"; public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES"; public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
public static final String ATLAS_TYPES_TOPIC = "ATLAS_TYPES";
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id"; protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
...@@ -81,10 +80,12 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -81,10 +80,12 @@ public class KafkaNotification extends AbstractNotification implements Service {
private KafkaProducer producer = null; private KafkaProducer producer = null;
private List<ConsumerConnector> consumerConnectors = new ArrayList<>(); private List<ConsumerConnector> consumerConnectors = new ArrayList<>();
private static final Map<NotificationType, String> topicMap = new HashMap<NotificationType, String>() {{ private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() {
{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC); put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC); put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
}}; }
};
// ----- Constructors ---------------------------------------------------- // ----- Constructors ----------------------------------------------------
...@@ -129,7 +130,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -129,7 +130,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
try { try {
startZk(); startZk();
startKafka(); startKafka();
} catch(Exception e) { } catch (Exception e) {
throw new AtlasException("Failed to start embedded kafka", e); throw new AtlasException("Failed to start embedded kafka", e);
} }
} }
...@@ -152,7 +153,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -152,7 +153,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
@Override @Override
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType,
int numConsumers) { int numConsumers) {
String topic = topicMap.get(notificationType); String topic = TOPIC_MAP.get(notificationType);
Properties consumerProperties = getConsumerProperties(notificationType); Properties consumerProperties = getConsumerProperties(notificationType);
...@@ -174,12 +175,12 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -174,12 +175,12 @@ public class KafkaNotification extends AbstractNotification implements Service {
} }
@Override @Override
public void _send(NotificationType type, String... messages) throws NotificationException { public void sendInternal(NotificationType type, String... messages) throws NotificationException {
if (producer == null) { if (producer == null) {
createProducer(); createProducer();
} }
String topic = topicMap.get(type); String topic = TOPIC_MAP.get(type);
List<Future<RecordMetadata>> futures = new ArrayList<>(); List<Future<RecordMetadata>> futures = new ArrayList<>();
for (String message : messages) { for (String message : messages) {
ProducerRecord record = new ProducerRecord(topic, message); ProducerRecord record = new ProducerRecord(topic, message);
...@@ -217,12 +218,12 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -217,12 +218,12 @@ public class KafkaNotification extends AbstractNotification implements Service {
/** /**
* Create a Kafka consumer connector from the given properties. * Create a Kafka consumer connector from the given properties.
* *
* @param properties the properties for creating the consumer connector * @param consumerProperties the properties for creating the consumer connector
* *
* @return a new Kafka consumer connector * @return a new Kafka consumer connector
*/ */
protected ConsumerConnector createConsumerConnector(Properties properties) { protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties)); return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProperties));
} }
/** /**
...@@ -271,7 +272,7 @@ public class KafkaNotification extends AbstractNotification implements Service { ...@@ -271,7 +272,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
private URL getURL(String url) throws MalformedURLException { private URL getURL(String url) throws MalformedURLException {
try { try {
return new URL(url); return new URL(url);
} catch(MalformedURLException e) { } catch (MalformedURLException e) {
return new URL("http://" + url); return new URL("http://" + url);
} }
} }
......
...@@ -6,16 +6,15 @@ ...@@ -6,16 +6,15 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p/> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p/> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.kafka; package org.apache.atlas.kafka;
import com.google.inject.Provider; import com.google.inject.Provider;
...@@ -25,7 +24,11 @@ import org.apache.atlas.ApplicationProperties; ...@@ -25,7 +24,11 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
/**
* Provider class that provides KafkaNotification for Guice.
*/
public class KafkaNotificationProvider implements Provider<KafkaNotification> { public class KafkaNotificationProvider implements Provider<KafkaNotification> {
@Override @Override
@Provides @Provides
@Singleton @Singleton
......
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p/> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p/> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -56,7 +56,7 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -56,7 +56,7 @@ public abstract class AbstractNotification implements NotificationInterface {
for (int index = 0; index < messages.size(); index++) { for (int index = 0; index < messages.size(); index++) {
strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index)); strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index));
} }
_send(type, strMessages); sendInternal(type, strMessages);
} }
@Override @Override
...@@ -64,5 +64,5 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -64,5 +64,5 @@ public abstract class AbstractNotification implements NotificationInterface {
send(type, Arrays.asList(messages)); send(type, Arrays.asList(messages));
} }
protected abstract void _send(NotificationType type, String[] messages) throws NotificationException; protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
} }
...@@ -6,16 +6,15 @@ ...@@ -6,16 +6,15 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p/> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p/> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification; package org.apache.atlas.notification;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
...@@ -100,54 +99,59 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon ...@@ -100,54 +99,59 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
protected abstract String peekMessage(); protected abstract String peekMessage();
// ----- inner class : ImmutableListDeserializer --------------------------- /**
* Deserializer for ImmutableList used by AbstractNotificationConsumer.GSON.
private static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> { */
public static class ImmutableListDeserializer implements JsonDeserializer<ImmutableList<?>> {
public static final Type LIST_TYPE = new TypeToken<List<?>>() {}.getType(); public static final Type LIST_TYPE = new TypeToken<List<?>>() {
}.getType();
@Override @Override
public ImmutableList<?> deserialize(JsonElement json, Type type, public ImmutableList<?> deserialize(JsonElement json, Type type,
JsonDeserializationContext context) throws JsonParseException { JsonDeserializationContext context) {
final List<?> list = context.deserialize(json, LIST_TYPE); final List<?> list = context.deserialize(json, LIST_TYPE);
return ImmutableList.copyOf(list); return ImmutableList.copyOf(list);
} }
} }
// ----- inner class : ImmutableMapDeserializer ---------------------------- /**
* Deserializer for ImmutableMap used by AbstractNotificationConsumer.GSON.
*/
public static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> { public static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?, ?>> {
public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {}.getType(); public static final Type MAP_TYPE = new TypeToken<Map<?, ?>>() {
}.getType();
@Override @Override
public ImmutableMap<?, ?> deserialize(JsonElement json, Type type, public ImmutableMap<?, ?> deserialize(JsonElement json, Type type,
JsonDeserializationContext context) throws JsonParseException { JsonDeserializationContext context) {
final Map<?, ?> map = context.deserialize(json, MAP_TYPE); final Map<?, ?> map = context.deserialize(json, MAP_TYPE);
return ImmutableMap.copyOf(map); return ImmutableMap.copyOf(map);
} }
} }
// ----- inner class : EntityNotificationDeserializer ---------------------- /**
* Deserializer for EntityNotification used by AbstractNotificationConsumer.GSON.
public final static class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> { */
public static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> {
@Override @Override
public EntityNotification deserialize(final JsonElement json, final Type type, public EntityNotification deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException { final JsonDeserializationContext context) {
return context.deserialize(json, EntityNotificationImpl.class); return context.deserialize(json, EntityNotificationImpl.class);
} }
} }
// ----- inner class : StructDeserializer ------------------------------- /**
* Serde for Struct used by AbstractNotificationConsumer.GSON.
public final static class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> { */
public static final class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> {
@Override @Override
public IStruct deserialize(final JsonElement json, final Type type, public IStruct deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException { final JsonDeserializationContext context) {
return context.deserialize(json, Struct.class); return context.deserialize(json, Struct.class);
} }
...@@ -159,13 +163,14 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon ...@@ -159,13 +163,14 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
} }
// ----- inner class : ReferenceableSerializerDeserializer ------------------------ /**
* Serde for Referenceable used by AbstractNotificationConsumer.GSON.
public final static class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>, */
public static final class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>,
JsonSerializer<IReferenceableInstance> { JsonSerializer<IReferenceableInstance> {
@Override @Override
public IReferenceableInstance deserialize(final JsonElement json, final Type type, public IReferenceableInstance deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException { final JsonDeserializationContext context) {
return InstanceSerialization.fromJsonReferenceable(json.toString(), true); return InstanceSerialization.fromJsonReferenceable(json.toString(), true);
} }
...@@ -178,14 +183,14 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon ...@@ -178,14 +183,14 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
} }
// ----- inner class : JSONArraySerializerDeserializer ---------------------------- /**
* Serde for JSONArray used by AbstractNotificationConsumer.GSON.
public final static class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>, */
public static final class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>,
JsonSerializer<JSONArray> { JsonSerializer<JSONArray> {
@Override @Override
public JSONArray deserialize(final JsonElement json, final Type type, public JSONArray deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException { final JsonDeserializationContext context) {
try { try {
return new JSONArray(json.toString()); return new JSONArray(json.toString());
} catch (JSONException e) { } catch (JSONException e) {
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,11 +15,13 @@ ...@@ -14,11 +15,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification; package org.apache.atlas.notification;
// TODO : docs! /**
public interface NotificationConsumer<T>{ * Interface for notification consumer.
* @param <T> message type
*/
public interface NotificationConsumer<T> {
boolean hasNext(); boolean hasNext();
T next(); T next();
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,11 +15,13 @@ ...@@ -14,11 +15,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification; package org.apache.atlas.notification;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
/**
* Exception from notification.
*/
public class NotificationException extends AtlasException { public class NotificationException extends AtlasException {
public NotificationException(Exception e) { public NotificationException(Exception e) {
super(e); super(e);
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification; package org.apache.atlas.notification;
import com.google.inject.Inject; import com.google.inject.Inject;
...@@ -35,7 +35,7 @@ import java.util.concurrent.Executors; ...@@ -35,7 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* Consumer of notifications from hooks e.g., hive hook etc * Consumer of notifications from hooks e.g., hive hook etc.
*/ */
@Singleton @Singleton
public class NotificationHookConsumer implements Service { public class NotificationHookConsumer implements Service {
...@@ -93,7 +93,8 @@ public class NotificationHookConsumer implements Service { ...@@ -93,7 +93,8 @@ public class NotificationHookConsumer implements Service {
this(atlasClient, consumer); this(atlasClient, consumer);
} }
public HookConsumer(AtlasClient client, NotificationConsumer<HookNotification.HookNotificationMessage> consumer) { public HookConsumer(AtlasClient client,
NotificationConsumer<HookNotification.HookNotificationMessage> consumer) {
this.client = client; this.client = client;
this.consumer = consumer; this.consumer = consumer;
} }
...@@ -101,7 +102,7 @@ public class NotificationHookConsumer implements Service { ...@@ -101,7 +102,7 @@ public class NotificationHookConsumer implements Service {
private boolean hasNext() { private boolean hasNext() {
try { try {
return consumer.hasNext(); return consumer.hasNext();
} catch(ConsumerTimeoutException e) { } catch (ConsumerTimeoutException e) {
return false; return false;
} }
} }
...@@ -113,7 +114,7 @@ public class NotificationHookConsumer implements Service { ...@@ -113,7 +114,7 @@ public class NotificationHookConsumer implements Service {
return; return;
} }
while(true) { while (true) {
try { try {
if (hasNext()) { if (hasNext()) {
HookNotification.HookNotificationMessage message = consumer.next(); HookNotification.HookNotificationMessage message = consumer.next();
...@@ -129,8 +130,8 @@ public class NotificationHookConsumer implements Service { ...@@ -129,8 +130,8 @@ public class NotificationHookConsumer implements Service {
HookNotification.EntityPartialUpdateRequest partialUpdateRequest = HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
(HookNotification.EntityPartialUpdateRequest) message; (HookNotification.EntityPartialUpdateRequest) message;
atlasClient.updateEntity(partialUpdateRequest.getTypeName(), atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getAttribute(),
partialUpdateRequest.getEntity()); partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
break; break;
case ENTITY_FULL_UPDATE: case ENTITY_FULL_UPDATE:
...@@ -138,13 +139,16 @@ public class NotificationHookConsumer implements Service { ...@@ -138,13 +139,16 @@ public class NotificationHookConsumer implements Service {
(HookNotification.EntityUpdateRequest) message; (HookNotification.EntityUpdateRequest) message;
atlasClient.updateEntities(updateRequest.getEntities()); atlasClient.updateEntities(updateRequest.getEntities());
break; break;
default:
throw new IllegalStateException("Unhandled exception!");
} }
} catch (Exception e) { } catch (Exception e) {
//todo handle failures //todo handle failures
LOG.warn("Error handling message {}", message, e); LOG.warn("Error handling message {}", message, e);
} }
} }
} catch(Throwable t) { } catch (Throwable t) {
LOG.warn("Failure in NotificationHookConsumer", t); LOG.warn("Failure in NotificationHookConsumer", t);
} }
} }
...@@ -158,15 +162,15 @@ public class NotificationHookConsumer implements Service { ...@@ -158,15 +162,15 @@ public class NotificationHookConsumer implements Service {
SERVER_READY_WAIT_TIME_MS); SERVER_READY_WAIT_TIME_MS);
timer.sleep(SERVER_READY_WAIT_TIME_MS); timer.sleep(SERVER_READY_WAIT_TIME_MS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Interrupted while waiting for Atlas Server to become ready, " + LOG.info("Interrupted while waiting for Atlas Server to become ready, "
"exiting consumer thread.", e); + "exiting consumer thread.", e);
return false; return false;
} }
} }
} catch (Throwable e) { } catch (Throwable e) {
LOG.info( LOG.info(
"Handled AtlasServiceException while waiting for Atlas Server to become ready, " + "Handled AtlasServiceException while waiting for Atlas Server to become ready, "
"exiting consumer thread.", e); + "exiting consumer thread.", e);
return false; return false;
} }
LOG.info("Atlas Server is ready, can start reading Kafka events."); LOG.info("Atlas Server is ready, can start reading Kafka events.");
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification; package org.apache.atlas.notification;
import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.notification.entity.EntityNotification;
...@@ -22,11 +22,18 @@ import org.apache.atlas.notification.hook.HookNotification; ...@@ -22,11 +22,18 @@ import org.apache.atlas.notification.hook.HookNotification;
import java.util.List; import java.util.List;
// TODO : docs! /**
* Notification interface for sending/receiving messages.
* 1. Atlas sends entity notifications
* 2. Hooks send notifications to create/update types/entities. Atlas reads these messages
*/
public interface NotificationInterface { public interface NotificationInterface {
String PROPERTY_PREFIX = "atlas.notification"; String PROPERTY_PREFIX = "atlas.notification";
/**
* Notification type - hooks and entities.
*/
enum NotificationType { enum NotificationType {
HOOK(HookNotification.HookNotificationMessage.class), ENTITIES(EntityNotification.class); HOOK(HookNotification.HookNotificationMessage.class), ENTITIES(EntityNotification.class);
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification; package org.apache.atlas.notification;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
...@@ -24,6 +24,9 @@ import org.apache.atlas.kafka.KafkaNotification; ...@@ -24,6 +24,9 @@ import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.kafka.KafkaNotificationProvider; import org.apache.atlas.kafka.KafkaNotificationProvider;
import org.apache.atlas.service.Service; import org.apache.atlas.service.Service;
/**
* Notification module for Guice.
*/
public class NotificationModule extends AbstractModule { public class NotificationModule extends AbstractModule {
@Override @Override
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification.entity; package org.apache.atlas.notification.entity;
import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IReferenceableInstance;
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification.entity; package org.apache.atlas.notification.entity;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
...@@ -106,14 +106,17 @@ public class EntityNotificationImpl implements EntityNotification { ...@@ -106,14 +106,17 @@ public class EntityNotificationImpl implements EntityNotification {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EntityNotificationImpl that = (EntityNotificationImpl) o; EntityNotificationImpl that = (EntityNotificationImpl) o;
return !(entity != null ? !entity.equals(that.entity) : that.entity != null) && return !(entity != null ? !entity.equals(that.entity) : that.entity != null)
operationType == that.operationType && && operationType == that.operationType && traits.equals(that.traits);
traits.equals(that.traits);
} }
@Override @Override
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification.entity; package org.apache.atlas.notification.entity;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
......
...@@ -6,16 +6,15 @@ ...@@ -6,16 +6,15 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p/> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p/> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification.hook; package org.apache.atlas.notification.hook;
import com.google.gson.JsonDeserializationContext; import com.google.gson.JsonDeserializationContext;
...@@ -34,11 +33,14 @@ import java.util.ArrayList; ...@@ -34,11 +33,14 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
/**
* Contains the structure of messages transferred from hooks to atlas.
*/
public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> { public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> {
@Override @Override
public HookNotificationMessage deserialize(JsonElement json, Type typeOfT, public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
JsonDeserializationContext context) throws JsonParseException { JsonDeserializationContext context) {
if (json.isJsonArray()) { if (json.isJsonArray()) {
JSONArray jsonArray = context.deserialize(json, JSONArray.class); JSONArray jsonArray = context.deserialize(json, JSONArray.class);
return new EntityCreateRequest(jsonArray); return new EntityCreateRequest(jsonArray);
...@@ -58,19 +60,28 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN ...@@ -58,19 +60,28 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
case TYPE_CREATE: case TYPE_CREATE:
case TYPE_UPDATE: case TYPE_UPDATE:
return context.deserialize(json, TypeRequest.class); return context.deserialize(json, TypeRequest.class);
}
default:
throw new IllegalStateException("Unhandled type " + type); throw new IllegalStateException("Unhandled type " + type);
} }
} }
}
/**
* Type of the hook message.
*/
public enum HookNotificationType { public enum HookNotificationType {
TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE
} }
/**
* Base type of hook message.
*/
public static class HookNotificationMessage { public static class HookNotificationMessage {
protected HookNotificationType type; protected HookNotificationType type;
private HookNotificationMessage() { } private HookNotificationMessage() {
}
public HookNotificationMessage(HookNotificationType type) { public HookNotificationMessage(HookNotificationType type) {
this.type = type; this.type = type;
...@@ -81,10 +92,14 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN ...@@ -81,10 +92,14 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
} }
} }
/**
* Hook message for create type definitions.
*/
public static class TypeRequest extends HookNotificationMessage { public static class TypeRequest extends HookNotificationMessage {
private TypesDef typesDef; private TypesDef typesDef;
private TypeRequest() { } private TypeRequest() {
}
public TypeRequest(HookNotificationType type, TypesDef typesDef) { public TypeRequest(HookNotificationType type, TypesDef typesDef) {
super(type); super(type);
...@@ -96,10 +111,14 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN ...@@ -96,10 +111,14 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
} }
} }
/**
* Hook message for creating new entities.
*/
public static class EntityCreateRequest extends HookNotificationMessage { public static class EntityCreateRequest extends HookNotificationMessage {
private List<Referenceable> entities; private List<Referenceable> entities;
private EntityCreateRequest() { } private EntityCreateRequest() {
}
public EntityCreateRequest(Referenceable... entities) { public EntityCreateRequest(Referenceable... entities) {
this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities)); this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities));
...@@ -131,6 +150,9 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN ...@@ -131,6 +150,9 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
} }
} }
/**
* Hook message for updating entities(full update).
*/
public static class EntityUpdateRequest extends EntityCreateRequest { public static class EntityUpdateRequest extends EntityCreateRequest {
public EntityUpdateRequest(Referenceable... entities) { public EntityUpdateRequest(Referenceable... entities) {
this(Arrays.asList(entities)); this(Arrays.asList(entities));
...@@ -141,13 +163,17 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN ...@@ -141,13 +163,17 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
} }
} }
/**
* Hook message for updating entities(partial update).
*/
public static class EntityPartialUpdateRequest extends HookNotificationMessage { public static class EntityPartialUpdateRequest extends HookNotificationMessage {
private String typeName; private String typeName;
private String attribute; private String attribute;
private Referenceable entity; private Referenceable entity;
private String attributeValue; private String attributeValue;
private EntityPartialUpdateRequest() { } private EntityPartialUpdateRequest() {
}
public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue, public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue,
Referenceable entity) { Referenceable entity) {
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.kafka; package org.apache.atlas.kafka;
import com.google.inject.Inject; import com.google.inject.Inject;
...@@ -99,7 +99,7 @@ public class KafkaNotificationTest { ...@@ -99,7 +99,7 @@ public class KafkaNotificationTest {
assertTrue(streams.contains(kafkaStream2)); assertTrue(streams.contains(kafkaStream2));
// assert that the given consumer group id was added to the properties used to create the consumer connector // assert that the given consumer group id was added to the properties used to create the consumer connector
Properties properties = kafkaNotification.consumerProperties; Properties properties = kafkaNotification.myProperties;
assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
} }
...@@ -113,7 +113,7 @@ public class KafkaNotificationTest { ...@@ -113,7 +113,7 @@ public class KafkaNotificationTest {
private final ConsumerConnector consumerConnector; private final ConsumerConnector consumerConnector;
private Properties consumerProperties; private Properties myProperties;
private List<KafkaStream> kafkaStreams = new LinkedList<>(); private List<KafkaStream> kafkaStreams = new LinkedList<>();
public TestKafkaNotification(Configuration applicationProperties, public TestKafkaNotification(Configuration applicationProperties,
...@@ -123,8 +123,8 @@ public class KafkaNotificationTest { ...@@ -123,8 +123,8 @@ public class KafkaNotificationTest {
} }
@Override @Override
protected ConsumerConnector createConsumerConnector(Properties properties) { protected ConsumerConnector createConsumerConnector(Properties consumerProperties) {
this.consumerProperties = properties; this.myProperties = consumerProperties;
kafkaStreams.clear(); kafkaStreams.clear();
return consumerConnector; return consumerConnector;
} }
......
/* /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one
* contributor license agreements. See the NOTICE file distributed with * or more contributor license agreements. See the NOTICE file
* this work for additional information regarding copyright ownership. * distributed with this work for additional information
* The ASF licenses this file to You under the Apache License, Version 2.0 * regarding copyright ownership. The ASF licenses this file
* (the "License"); you may not use this file except in compliance with * to you under the Apache License, Version 2.0 (the
* the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
...@@ -14,7 +15,6 @@ ...@@ -14,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification; package org.apache.atlas.notification;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
...@@ -75,7 +75,8 @@ public class NotificationHookConsumerTest { ...@@ -75,7 +75,8 @@ public class NotificationHookConsumerTest {
NotificationHookConsumer.HookConsumer hookConsumer = NotificationHookConsumer.HookConsumer hookConsumer =
notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class)); notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION, new Exception())); when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION,
new Exception()));
assertFalse(hookConsumer.serverAvailable(timer)); assertFalse(hookConsumer.serverAvailable(timer));
} }
......
...@@ -79,7 +79,7 @@ public class EntityNotificationImplTest { ...@@ -79,7 +79,7 @@ public class EntityNotificationImplTest {
} }
@Test @Test
public void testGetAllTraits_superTraits() throws Exception { public void testGetAllTraitsSuperTraits() throws Exception {
TypeSystem typeSystem = mock(TypeSystem.class); TypeSystem typeSystem = mock(TypeSystem.class);
......
...@@ -6,16 +6,15 @@ ...@@ -6,16 +6,15 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p/> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p/> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.notification.hook; package org.apache.atlas.notification.hook;
import org.apache.atlas.notification.AbstractNotificationConsumer; import org.apache.atlas.notification.AbstractNotificationConsumer;
......
...@@ -368,6 +368,7 @@ ...@@ -368,6 +368,7 @@
<!-- skips checkstyle and find bugs --> <!-- skips checkstyle and find bugs -->
<skipCheck>false</skipCheck> <skipCheck>false</skipCheck>
<checkstyle.failOnViolation>false</checkstyle.failOnViolation>
<skipUTs>false</skipUTs> <skipUTs>false</skipUTs>
<skipITs>false</skipITs> <skipITs>false</skipITs>
<skipDocs>true</skipDocs> <skipDocs>true</skipDocs>
...@@ -1611,7 +1612,9 @@ ...@@ -1611,7 +1612,9 @@
<includeTestSourceDirectory>true</includeTestSourceDirectory> <includeTestSourceDirectory>true</includeTestSourceDirectory>
<configLocation>src/build/checkstyle.xml</configLocation> <configLocation>src/build/checkstyle.xml</configLocation>
<headerLocation>src/build/checkstyle-java-header.txt</headerLocation> <headerLocation>src/build/checkstyle-java-header.txt</headerLocation>
<failOnViolation>false</failOnViolation> <suppressionsLocation>src/build/checkstyle-suppressions.xml</suppressionsLocation>
<suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression>
<failOnViolation>${checkstyle.failOnViolation}</failOnViolation>
</configuration> </configuration>
</execution> </execution>
</executions> </executions>
......
...@@ -7,6 +7,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -7,6 +7,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-433 Fix checkstyle issues for common and notification module (shwethags)
ATLAS-183 Add a Hook in Storm to post the topology metadata (svenkat,yhemanth via shwethags) ATLAS-183 Add a Hook in Storm to post the topology metadata (svenkat,yhemanth via shwethags)
ATLAS-370 Implement deleteEntities at repository level (dkantor via shwethags) ATLAS-370 Implement deleteEntities at repository level (dkantor via shwethags)
ATLAS-406 Resizing lineage window – should be an anchor on a corner – like ppt for graphic (sanjayp via shwethags) ATLAS-406 Resizing lineage window – should be an anchor on a corner – like ppt for graphic (sanjayp via shwethags)
......
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<suppress checks="JavadocType" files="[/\\]src[/\\]test[/\\]java[/\\]"/>
</suppressions>
...@@ -232,4 +232,7 @@ ...@@ -232,4 +232,7 @@
<property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/> <property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
</module> </module>
<module name="SuppressionFilter">
<property name="file" value="${checkstyle.suppressions.file}"/>
</module>
</module> </module>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment