diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 6513234..e48967d 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -164,6 +164,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { public HiveHook() { } + public HiveHook(String name) { + super(name); + } + + @Override public void run(HookContext hookContext) throws Exception { if (LOG.isDebugEnabled()) { diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java index 3c0f0c1..f01419c 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java @@ -45,7 +45,7 @@ public class HiveMetastoreHookImpl extends MetaStoreEventListener { public HiveMetastoreHookImpl(Configuration config) { super(config); - this.hiveHook = new HiveHook(); + this.hiveHook = new HiveHook(this.getClass().getSimpleName()); this.hook = new HiveMetastoreHook(); } diff --git a/notification/pom.xml b/notification/pom.xml index 8affd59..740e8e5 100644 --- a/notification/pom.xml +++ b/notification/pom.xml @@ -56,6 +56,18 @@ </dependency> <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${kafka.scala.binary.version}</artifactId> </dependency> diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 8659126..26c2d8f 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -126,6 +126,7 @@ public abstract class AtlasHook { try { LOG.info("==> Shutdown of Atlas Hook"); + notificationInterface.close(); executor.shutdown(); executor.awaitTermination(SHUTDOWN_HOOK_WAIT_TIME_MS, TimeUnit.MILLISECONDS); executor = null; @@ -141,6 +142,15 @@ public abstract class AtlasHook { LOG.info("Created Atlas Hook"); } + public AtlasHook() { + notificationInterface.init(this.getClass().getSimpleName(), failedMessagesLogger); + } + + public AtlasHook(String name) { + LOG.info("AtlasHook: Spool name: Passed from caller.: {}", name); + notificationInterface.init(name, failedMessagesLogger); + } + /** * Notify atlas of the entity through message. The entity can be a * complex entity with reference to other entities. diff --git a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java index b319e81..5488c1c 100644 --- a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java +++ b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java @@ -19,16 +19,14 @@ package org.apache.atlas.hook; -import org.apache.log4j.Appender; +import org.apache.atlas.notification.LogConfigUtils; import org.apache.log4j.DailyRollingFileAppender; -import org.apache.log4j.FileAppender; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; import java.io.File; import java.io.IOException; -import java.util.Enumeration; /** * A logger wrapper that can be used to write messages that failed to be sent to a log file. @@ -46,7 +44,7 @@ public class FailedMessagesLogger { } void init() { - String rootLoggerDirectory = getRootLoggerDirectory(); + String rootLoggerDirectory = LogConfigUtils.getRootDir(); if (rootLoggerDirectory == null) { return; } @@ -62,38 +60,7 @@ public class FailedMessagesLogger { } } - /** - * Get the root logger file location under which the failed log messages will be written. - * - * Since this class is used in Hooks which run within JVMs of other components like Hive, - * we want to write the failed messages file under the same location as where logs from - * the host component are saved. This method attempts to get such a location from the - * root logger's appenders. It will work only if at least one of the appenders is a {@link FileAppender} - * - * @return directory under which host component's logs are stored. - */ - private String getRootLoggerDirectory() { - String rootLoggerDirectory = null; - Logger rootLogger = Logger.getRootLogger(); - Enumeration allAppenders = rootLogger.getAllAppenders(); - - if (allAppenders != null) { - while (allAppenders.hasMoreElements()) { - Appender appender = (Appender) allAppenders.nextElement(); - - if (appender instanceof FileAppender) { - FileAppender fileAppender = (FileAppender) appender; - String rootLoggerFile = fileAppender.getFile(); - - rootLoggerDirectory = rootLoggerFile != null ? new File(rootLoggerFile).getParent() : null; - break; - } - } - } - return rootLoggerDirectory; - } - - void log(String message) { + public void log(String message) { logger.error(message); } } diff --git a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java index 2dd970e..b35af97 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java +++ b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java @@ -19,23 +19,59 @@ package org.apache.atlas.kafka; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; +import org.apache.atlas.notification.LogConfigUtils; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.spool.AtlasFileSpool; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; /** * Provider class for Notification interfaces */ public class NotificationProvider { - private static KafkaNotification kafka; + private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class); + + private static final String CONF_ATLAS_HOOK_SPOOL_ENABLED = "atlas.hook.spool.enabled"; + private static final String CONF_ATLAS_HOOK_SPOOL_DIR = "atlas.hook.spool.dir"; + + private static final boolean CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT = false; - public static KafkaNotification get() { - if (kafka == null) { + private static NotificationInterface notificationProvider; + + public static NotificationInterface get() { + if (notificationProvider == null) { try { - Configuration applicationProperties = ApplicationProperties.get(); - kafka = new KafkaNotification(applicationProperties); + Configuration conf = ApplicationProperties.get(); + KafkaNotification kafka = new KafkaNotification(conf); + String spoolDir = getSpoolDir(conf); + + if (isSpoolingEnabled(conf) && StringUtils.isNotEmpty(spoolDir)) { + LOG.info("Notification spooling is enabled: spool directory={}", spoolDir); + + conf.setProperty(CONF_ATLAS_HOOK_SPOOL_DIR, spoolDir); + + notificationProvider = new AtlasFileSpool(conf, kafka); + } else { + LOG.info("Notification spooling is not enabled"); + + notificationProvider = kafka; + } } catch (AtlasException e) { throw new RuntimeException(e); } } - return kafka; + return notificationProvider; + } + + private static boolean isSpoolingEnabled(Configuration configuration) { + return configuration.getBoolean(CONF_ATLAS_HOOK_SPOOL_ENABLED, CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT); + } + + private static String getSpoolDir(Configuration configuration) { + return configuration.getString(CONF_ATLAS_HOOK_SPOOL_DIR); } } diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index 45a66bf..c45a1da 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -76,6 +76,10 @@ public abstract class AbstractNotification implements NotificationInterface { protected AbstractNotification() { } + @Override + public void init(String source, Object failedMessagesLogger) { + } + // ----- NotificationInterface ------------------------------------------- @Override @@ -108,7 +112,7 @@ public abstract class AbstractNotification implements NotificationInterface { * * @throws NotificationException if an error occurs while sending */ - protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException; + public abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException; // ----- utility methods ------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/LogConfigUtils.java b/notification/src/main/java/org/apache/atlas/notification/LogConfigUtils.java new file mode 100644 index 0000000..dc98592 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/LogConfigUtils.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification; + +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.FileAppender; +import org.apache.logging.log4j.core.appender.RollingFileAppender; +import org.apache.logging.log4j.core.appender.RollingRandomAccessFileAppender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Enumeration; + +public class LogConfigUtils { + private static final Logger LOG = LoggerFactory.getLogger(LogConfigUtils.class); + + public static String getRootDir() { + String ret = getFileAppenderPath(); + + if (StringUtils.isEmpty(ret)) { + ret = getFileAppenderPathApproach2(); + } + + if (StringUtils.isNotEmpty(ret)) { + ret = StringUtils.substringBeforeLast(ret, File.separator); + } else { + ret = null; + } + + LOG.info("getRootDir(): ret={}", ret); + + return ret; + } + + private static String getFileAppenderPath() { + String ret = StringUtils.EMPTY; + LoggerContext loggerContext = (LoggerContext) LogManager.getContext(); + Configuration configuration = loggerContext.getConfiguration(); + + for (Appender appender : configuration.getAppenders().values()) { + if (appender instanceof RollingRandomAccessFileAppender) { + ret = ((RollingRandomAccessFileAppender) appender).getFileName(); + break; + } else if (appender instanceof RollingFileAppender) { + ret = ((RollingRandomAccessFileAppender) appender).getFileName(); + break; + } else if (appender instanceof FileAppender) { + ret = ((FileAppender) appender).getFileName(); + break; + } else { + LOG.info("Could not infer log path from this appender: {}", appender.getClass().getName()); + } + } + + LOG.info("getFileAppenderPath(): ret={}", ret); + + return ret; + } + + private static String getFileAppenderPathApproach2() { + String ret = null; + + try { + org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger(); + Enumeration allAppenders = rootLogger.getAllAppenders(); + + if (allAppenders != null) { + while (allAppenders.hasMoreElements()) { + Object appender = allAppenders.nextElement(); + + if (appender instanceof org.apache.log4j.FileAppender) { + org.apache.log4j.FileAppender fileAppender = (org.apache.log4j.FileAppender) appender; + + ret = fileAppender.getName(); + + break; + } + } + } + } catch (Exception e) { + LOG.error("getFileAppenderPathApproach2(): failed to get appender path", e); + } + + LOG.info("getFileAppenderPathApproach2(): ret={}", ret); + + return ret; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java index 2dd9c9f..353d650 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java @@ -31,6 +31,10 @@ public class NotificationException extends AtlasException { super(e); } + public NotificationException(Exception e, String errorMsg) { + super(errorMsg, e); + } + public NotificationException(Exception e, List<String> failedMessages) { super(e); this.failedMessages = failedMessages; diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index 6caf7e2..edd8ed9 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -61,6 +61,14 @@ public interface NotificationInterface { /** * + * @param source: Name of the source + * @param failedMessagesLogger: Logger for failed messages + * @return + */ + void init(String source, Object failedMessagesLogger); + + /** + * * @param user Name of the user under which the processes is running */ void setCurrentUser(String user); diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Archiver.java b/notification/src/main/java/org/apache/atlas/notification/spool/Archiver.java new file mode 100644 index 0000000..9e12d26 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/Archiver.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import org.apache.atlas.notification.spool.models.IndexRecord; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; + +public class Archiver { + private static final Logger LOG = LoggerFactory.getLogger(Archiver.class); + + private final String source; + private final File indexDoneFile; + private final File archiveFolder; + private final int maxArchiveFiles; + + public Archiver(String source, File indexDoneFile, File archiveFolder, int maxArchiveFiles) { + this.source = source; + this.indexDoneFile = indexDoneFile; + this.archiveFolder = archiveFolder; + this.maxArchiveFiles = maxArchiveFiles; + } + + public void archive(IndexRecord indexRecord) { + moveToArchiveDir(indexRecord); + + removeOldFiles(); + } + + private void moveToArchiveDir(IndexRecord indexRecord) { + File spoolFile = null; + File archiveFile = null; + + try { + spoolFile = new File(indexRecord.getPath()); + archiveFile = new File(archiveFolder, spoolFile.getName()); + + LOG.info("{}: moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile); + + FileUtils.moveFile(spoolFile, archiveFile); + } catch (FileNotFoundException excp) { + LOG.warn("{}: failed while moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile, excp); + } catch (IOException excp) { + LOG.error("{}: failed while moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile, excp); + } + } + + private void removeOldFiles() { + try { + File[] logFiles = archiveFolder == null ? null : archiveFolder.listFiles(pathname -> StringUtils.endsWithIgnoreCase(pathname.getName(), SpoolUtils.FILE_EXT_LOG)); + int filesToDelete = logFiles == null ? 0 : logFiles.length - maxArchiveFiles; + + if (filesToDelete > 0) { + try (BufferedReader br = new BufferedReader(new FileReader(indexDoneFile))) { + int filesDeletedCount = 0; + + for (String line = br.readLine(); line != null; line = br.readLine()) { + line = line.trim(); + + if (StringUtils.isEmpty(line)) { + continue; + } + + try { + IndexRecord record = AtlasType.fromJson(line, IndexRecord.class); + File logFile = new File(record.getPath()); + String fileName = logFile.getName(); + File archiveFile = new File(archiveFolder, fileName); + + if (!archiveFile.exists()) { + LOG.warn("archive file does not exist: {}", archiveFile); + + continue; + } + + LOG.info("Deleting archive file: {}", archiveFile); + + boolean ret = archiveFile.delete(); + + if (!ret) { + LOG.error("{}: Error deleting archive file. File: {}", source, archiveFile); + } else { + filesDeletedCount++; + } + + if (filesDeletedCount >= filesToDelete) { + break; + } + } catch (Exception excp) { + LOG.error("{}: Error deleting older archive file in index-record: {}", source, line, excp); + } + } + + LOG.info("{}: Deleted: {} archived files", source, filesDeletedCount); + } + } + } catch(Exception exception){ + LOG.error("{}: Error deleting older files from archive folder. Folder: {}", source, archiveFolder, exception); + } + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java new file mode 100644 index 0000000..2d7d195 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.hook.FailedMessagesLogger; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationException; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; + +public class AtlasFileSpool implements NotificationInterface { + private static final Logger LOG = LoggerFactory.getLogger(AtlasFileSpool.class); + + private final AbstractNotification notificationHandler; + private final SpoolConfiguration config; + private final IndexManagement indexManagement; + private final Spooler spooler; + private final Publisher publisher; + private Thread publisherThread; + private Boolean initDone = null; + + public AtlasFileSpool(Configuration configuration, AbstractNotification notificationHandler) { + this.notificationHandler = notificationHandler; + this.config = new SpoolConfiguration(configuration, notificationHandler.getClass().getSimpleName()); + this.indexManagement = new IndexManagement(config); + this.spooler = new Spooler(config, indexManagement); + this.publisher = new Publisher(config, indexManagement, notificationHandler); + } + + @Override + public void init(String source, Object failedMessagesLogger) { + LOG.info("==> AtlasFileSpool.init(source={})", source); + + if (!isInitDone()) { + try { + config.setSource(source); + + LOG.info("{}: Initialization: Starting...", this.config.getSourceName()); + + indexManagement.init(); + + if (failedMessagesLogger instanceof FailedMessagesLogger) { + this.spooler.setFailedMessagesLogger((FailedMessagesLogger) failedMessagesLogger); + } + + startPublisher(); + + initDone = true; + } catch (AtlasException exception) { + LOG.error("AtlasFileSpool(source={}): initialization failed", this.config.getSourceName(), exception); + + initDone = false; + } catch (Throwable t) { + LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", this.config.getSourceName(), t); + } + } else { + LOG.info("AtlasFileSpool.init(): initialization already done. initDone={}", initDone); + } + + LOG.info("<== AtlasFileSpool.init(source={})", source); + } + + @Override + public void setCurrentUser(String user) { + this.notificationHandler.setCurrentUser(user); + } + + @Override + public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) { + LOG.warn("AtlasFileSpool.createConsumers(): not implemented"); + + return null; + } + + @Override + public <T> void send(NotificationType type, T... messages) throws NotificationException { + send(type, Arrays.asList(messages)); + } + + @Override + public <T> void send(NotificationType type, List<T> messages) throws NotificationException { + if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) { + if (LOG.isDebugEnabled()) { + LOG.debug("AtlasFileSpool.send(): sending to spooler"); + } + + spooler.send(type, messages); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("AtlasFileSpool.send(): sending to notificationHandler"); + } + + try { + notificationHandler.send(type, messages); + } catch (Exception e) { + if (isInitDone()) { + LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", e); + + publisher.setDestinationDown(); + + spooler.send(type, messages); + } else { + LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not yet initialized", e); + + throw e; + } + } + } + } + + @Override + public void close() { + try { + spooler.setDrain(); + publisher.setDrain(); + indexManagement.stop(); + + publisherThread.join(); + } catch (InterruptedException e) { + LOG.error("Interrupted! source={}", this.config.getSourceName(), e); + } + } + + private void startPublisher() { + publisherThread = new Thread(publisher); + + publisherThread.setDaemon(true); + publisherThread.setContextClassLoader(this.getClass().getClassLoader()); + publisherThread.start(); + + LOG.info("{}: publisher started!", this.config.getSourceName()); + } + + private boolean isInitDone() { + return this.initDone != null; + } + + private boolean hasInitSucceeded() { + return this.initDone != null && this.initDone == true; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/FileOperations.java b/notification/src/main/java/org/apache/atlas/notification/spool/FileOperations.java new file mode 100644 index 0000000..538ea49 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/FileOperations.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import org.apache.atlas.notification.spool.utils.local.FileOpAppend; +import org.apache.atlas.notification.spool.utils.local.FileOpCompaction; +import org.apache.atlas.notification.spool.utils.local.FileOpDelete; +import org.apache.atlas.notification.spool.utils.local.FileOpRead; +import org.apache.atlas.notification.spool.utils.local.FileOpUpdate; + +import java.io.File; + +public class FileOperations { + private final String emptyRecordJson; + private final FileOpAppend fileOpAppend; + private final FileOpRead fileOpLoad; + private final FileOpUpdate fileOpUpdate; + private final FileOpCompaction fileOpCompaction; + private final FileOpDelete fileOpDelete; + + public FileOperations(String emptyRecordJson, String source) { + this.emptyRecordJson = emptyRecordJson; + this.fileOpAppend = new FileOpAppend(source); + this.fileOpLoad = new FileOpRead(source); + this.fileOpUpdate = new FileOpUpdate(source, fileOpAppend); + this.fileOpCompaction = new FileOpCompaction(source); + this.fileOpDelete = new FileOpDelete(source); + } + + public String[] load(File file) { + fileOpLoad.perform(file); + + return fileOpLoad.getItems(); + } + + public void delete(File file, String id) { + fileOpDelete.perform(file, id, emptyRecordJson); + } + + public void append(File file, String json) { + fileOpAppend.perform(file, json); + } + + public void compact(File file) { + fileOpCompaction.perform(file); + } + + public void update(File file, String id, String json) { + fileOpUpdate.setId(id); + fileOpUpdate.perform(file, json); + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java new file mode 100644 index 0000000..b3a586b --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java @@ -0,0 +1,487 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.AtlasException; +import org.apache.atlas.notification.spool.models.IndexRecord; +import org.apache.atlas.notification.spool.models.IndexRecords; +import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutput; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.channels.OverlappingFileLockException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class IndexManagement { + private static final Logger LOG = LoggerFactory.getLogger(IndexManagement.class); + + private static final int MAX_RETRY_ATTEMPTS = 3; + + private final SpoolConfiguration config; + private IndexFileManager indexFileManager; + private IndexReader indexReader; + private IndexWriter indexWriter; + + public IndexManagement(SpoolConfiguration config) { + this.config = config; + } + + public void init() throws IOException, AtlasException { + String sourceName = config.getSourceName(); + + File spoolDir = SpoolUtils.getCreateDirectory(config.getSpoolDir()); + + if (spoolDir == null) { + throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, spoolDir.getAbsolutePath())); + } + + File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir()); + + if (archiveDir == null) { + throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, archiveDir.getAbsolutePath())); + } + + File indexFile = SpoolUtils.getCreateFile(config.getIndexFile(), sourceName); + + if (indexFile == null) { + throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexFile.getAbsolutePath())); + } + + File indexDoneFile = SpoolUtils.getCreateFile(config.getIndexDoneFile(), sourceName); + + if (indexDoneFile == null) { + throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexDoneFile.getAbsolutePath())); + } + + performInit(indexFile.getAbsolutePath(), sourceName); + } + + @VisibleForTesting + void performInit(String indexFilePath, String source) { + try { + File spoolDir = config.getSpoolDir(); + File archiveDir = config.getArchiveDir(); + File indexFile = config.getIndexFile(); + File indexDoneFile = config.getIndexDoneFile(); + + indexFileManager = new IndexFileManager(source, indexFile, indexDoneFile, archiveDir, config.getMaxArchiveFiles()); + indexReader = new IndexReader(source, indexFileManager, config.getRetryDestinationMS()); + indexWriter = new IndexWriter(source, config, indexFileManager, indexReader, spoolDir, archiveDir, config.getFileRolloverSec()); + } catch (Exception e) { + LOG.error("{}: init: Failed! Error loading records from index file: {}", config.getSourceName(), indexFilePath); + } + } + + public boolean isPending() { + return !indexReader.isEmpty() || + (indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0); + } + + public synchronized DataOutput getSpoolWriter() throws IOException { + return indexWriter.getCreateWriter(); + } + + public void setSpoolWriteInProgress() { + this.indexWriter.setFileWriteInProgress(true); + } + + public void resetSpoolWriteInProgress() { + this.indexWriter.setFileWriteInProgress(false); + } + + public void updateFailedAttempt() { + this.indexReader.updateFailedAttempt(); + } + + public IndexRecord next() throws InterruptedException { + return indexReader.next(); + } + + public int getQueueSize() { + return indexReader.size(); + } + + public void removeAsDone(IndexRecord indexRecord) { + this.indexReader.removeAsDone(indexRecord); + this.indexWriter.rolloverIfNeeded(); + } + + public void stop() { + indexWriter.stop(); + } + + public void rolloverSpoolFileIfNeeded() { + this.indexWriter.rolloverIfNeeded(); + } + + @VisibleForTesting + IndexFileManager getIndexFileManager() { + return this.indexFileManager; + } + + public void update(IndexRecord record) { + this.indexFileManager.updateIndex(record); + } + + public void flushSpoolWriter() throws IOException { + this.indexWriter.flushCurrent(); + } + + static class IndexWriter { + private final String source; + private final SpoolConfiguration config; + private final File spoolFolder; + private final File archiveFolder; + private final int rollOverTimeout; + private final IndexFileManager indexFileManager; + private final IndexReader indexReader; + private final FileLockedReadWrite fileLockedReadWrite; + private IndexRecord currentIndexRecord; + private DataOutput currentWriter; + private boolean fileWriteInProgress; + + + public IndexWriter(String source, SpoolConfiguration config, IndexFileManager indexFileManager, + IndexReader indexReader, + File spoolFolder, File archiveFolder, int rollOverTimeout) { + this.source = source; + this.config = config; + this.indexFileManager = indexFileManager; + this.indexReader = indexReader; + this.spoolFolder = spoolFolder; + this.archiveFolder = archiveFolder; + this.rollOverTimeout = rollOverTimeout; + this.fileLockedReadWrite = new FileLockedReadWrite(source); + + setCurrent(indexFileManager.getFirstWriteInProgressRecord()); + } + + public void setCurrent(IndexRecord indexRecord) { + this.currentIndexRecord = indexRecord; + } + + public IndexRecord getCurrent() { + return this.currentIndexRecord; + } + + private void setCurrentWriter(File file) throws IOException { + this.currentWriter = fileLockedReadWrite.getOutput(file); + } + + public synchronized DataOutput getWriter() { + return this.currentWriter; + } + + public synchronized DataOutput getCreateWriter() throws IOException { + rolloverIfNeeded(); + + if (getCurrent() == null) { + IndexRecord record = new IndexRecord(StringUtils.EMPTY); + String filePath = SpoolUtils.getSpoolFilePath(config, spoolFolder.toString(), archiveFolder.toString(), record.getId()); + + record.setPath(filePath); + + indexFileManager.appendToIndexFile(record); + + setCurrent(record); + + LOG.info("IndexWriter.getCreateWriter(source={}): Creating new spool file. File: {}", this.source, filePath); + + setCurrentWriter(new File(filePath)); + } else { + if (this.currentWriter == null) { + LOG.info("IndexWriter.getCreateWriter(source={}): Opening existing file for append: File: {}", this.source, currentIndexRecord.getPath()); + + setCurrentWriter(new File(currentIndexRecord.getPath())); + } + } + + return currentWriter; + } + + public synchronized void rolloverIfNeeded() { + if (currentWriter != null && shouldRolloverSpoolFile()) { + LOG.info("IndexWriter.rolloverIfNeeded(source={}): Rolling over. Closing File: {}", this.config.getSourceName(), currentIndexRecord.getPath()); + + fileLockedReadWrite.close(); + + currentWriter = null; + + currentIndexRecord.setStatusPending(); + + indexFileManager.updateIndex(currentIndexRecord); + + LOG.info("IndexWriter.rolloverIfNeeded(source={}): Adding file to queue. File: {}", this.config.getSourceName(), currentIndexRecord.getPath()); + + indexReader.addToPublishQueue(currentIndexRecord); + + currentIndexRecord = null; + } + } + + private boolean shouldRolloverSpoolFile() { + return currentIndexRecord != null && + (System.currentTimeMillis() - currentIndexRecord.getCreated() > this.rollOverTimeout); + } + + void flushCurrent() throws IOException { + DataOutput pw = getWriter(); + + if (pw != null) { + fileLockedReadWrite.flush(); + } + } + + public void setFileWriteInProgress(boolean val) { + this.fileWriteInProgress = val; + } + + public boolean isWriteInProgress() { + return this.fileWriteInProgress; + } + + public void stop() { + LOG.info("==> IndexWriter.stop(source={})", this.config.getSourceName()); + + try { + DataOutput out = getWriter(); + + if (out != null) { + flushCurrent(); + + for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) { + if (isWriteInProgress()) { + try { + TimeUnit.SECONDS.sleep(i); + } catch (InterruptedException e) { + LOG.error("IndexWriter.stop(source={}): Interrupted!", this.config.getSourceName(), e); + + break; + } + + continue; + } + + LOG.info("IndexWriter.stop(source={}): Closing open file.", this.config.getSourceName()); + + fileLockedReadWrite.close(); + currentIndexRecord.setStatusPending(); + indexFileManager.updateIndex(currentIndexRecord); + + break; + } + } + } catch (FileNotFoundException e) { + LOG.error("IndexWriter.stop(source={}): File not found! {}", this.config.getSourceName(), getCurrent().getPath(), e); + } catch (IOException exception) { + LOG.error("IndexWriter.stop(source={}): Error accessing file: {}", this.config.getSourceName(), getCurrent().getPath(), exception); + } catch (Exception exception) { + LOG.error("IndexWriter.stop(source={}): Error closing spool file.", this.config.getSourceName(), exception); + } + + LOG.info("<== IndexWriter.stop(source={})", this.config.getSourceName()); + } + } + + static class IndexReader { + private final String source; + private final BlockingQueue<IndexRecord> blockingQueue; + private final IndexFileManager indexFileManager; + private final long retryDestinationMS; + private IndexRecord currentIndexRecord; + + public IndexReader(String source, IndexFileManager indexFileManager, long retryDestinationMS) { + this.source = source; + this.blockingQueue = new LinkedBlockingQueue<>(); + this.retryDestinationMS = retryDestinationMS; + this.indexFileManager = indexFileManager; + + List<IndexRecord> records = indexFileManager.getRecords(); + + records.stream().forEach(x -> addIfStatus(x, IndexRecord.STATUS_READ_IN_PROGRESS)); + records.stream().forEach(x -> addIfStatus(x, IndexRecord.STATUS_PENDING)); + } + + private void addIfStatus(IndexRecord record, String status) { + if (record != null && record.getStatus().equals(status)) { + if (!SpoolUtils.fileExists(record)) { + LOG.error("IndexReader.addIfStatus(source={}): file {} not found!", this.source, record.getPath()); + } else { + addToPublishQueue(record); + } + } + } + + public void addToPublishQueue(IndexRecord record) { + try { + if (!blockingQueue.contains(record)) { + blockingQueue.add(record); + } + } catch (OverlappingFileLockException lockException) { + LOG.warn("{}: {}: Someone else has locked the file.", source, record.getPath()); + } + } + + public IndexRecord next() throws InterruptedException { + this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS); + + return this.currentIndexRecord; + } + + public int size() { + return blockingQueue.size(); + } + + public boolean isEmpty() { + return blockingQueue.isEmpty(); + } + + public void updateFailedAttempt() { + if (currentIndexRecord != null) { + currentIndexRecord.updateFailedAttempt(); + + indexFileManager.updateIndex(currentIndexRecord); + } + } + + public void removeAsDone(IndexRecord indexRecord) { + indexRecord.setDone(); + + indexFileManager.remove(indexRecord); + } + } + + static class IndexFileManager { + private final String source; + private final File indexDoneFile; + private final File indexFile; + private final Archiver archiver; + private final FileOperations fileOperations; + + public IndexFileManager(String source, File indexFile, File indexDoneFile, File archiveFolder, int maxArchiveFiles) { + this.source = source; + this.indexFile = indexFile; + this.indexDoneFile = indexDoneFile; + this.archiver = new Archiver(source, indexDoneFile, archiveFolder, maxArchiveFiles); + this.fileOperations = new FileOperations(SpoolUtils.getEmptyRecordForWriting(), source); + } + + public List<IndexRecord> getRecords() { + return new ArrayList<>(loadRecords(indexFile).getRecords().values()); + } + + public synchronized void delete(File file, String id) { + fileOperations.delete(file, id); + } + + public synchronized IndexRecord getFirstWriteInProgressRecord() { + IndexRecord ret = null; + IndexRecords records = loadRecords(indexFile); + + if (records != null) { + for (IndexRecord record : records.getRecords().values()) { + if (record.isStatusWriteInProgress()) { + LOG.info("IndexFileManager.getFirstWriteInProgressRecord(source={}): current file={}", this.source, record.getPath()); + + ret = record; + + break; + } + } + } + + return ret; + } + + public synchronized void remove(IndexRecord record) { + delete(indexFile, record.getId()); + + appendToDoneFile(record); + + IndexRecords records = loadRecords(indexFile); + + if (records.size() == 0) { + LOG.info("IndexFileManager.remove(source={}): All done!", this.source); + + compactFile(indexFile); + } + } + + public void appendToIndexFile(IndexRecord record) { + fileOperations.append(indexFile, SpoolUtils.getRecordForWriting(record)); + } + + public void updateIndex(IndexRecord record) { + fileOperations.update(indexFile, record.getId(), SpoolUtils.getRecordForWriting(record)); + } + + private void compactFile(File file) { + LOG.info("IndexFileManager.compactFile(source={}): compacting file {}", source, file.getAbsolutePath()); + + try { + fileOperations.compact(file); + } finally { + LOG.info("IndexFileManager.compactFile(source={}): done compacting file {}", source, file.getAbsolutePath()); + } + } + + private void appendToDoneFile(IndexRecord indexRecord) { + String json = SpoolUtils.getRecordForWriting(indexRecord); + + fileOperations.append(indexDoneFile, json); + + archiver.archive(indexRecord); + } + + @VisibleForTesting + IndexRecords loadRecords(File file) { + String[] items = fileOperations.load(file); + + return SpoolUtils.createRecords(items); + } + + @VisibleForTesting + File getDoneFile() { + return this.indexDoneFile; + } + + @VisibleForTesting + File getIndexFile() { + return this.indexFile; + } + + @VisibleForTesting + IndexRecord add(String path) { + IndexRecord record = new IndexRecord(path); + + appendToIndexFile(record); + + return record; + } + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java new file mode 100644 index 0000000..2947a21 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.notification.NotificationException; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.spool.models.IndexRecord; +import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.channels.OverlappingFileLockException; +import java.util.ArrayList; +import java.util.List; + +public class Publisher implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(Publisher.class); + + private final SpoolConfiguration configuration; + private final IndexManagement indexManagement; + private final AbstractNotification notificationHandler; + private final String notificationHandlerName; + private final int retryDestinationMS; + private final int messageBatchSize; + private String source; + private boolean isDrain; + private boolean isDestDown; + + public Publisher(SpoolConfiguration configuration, IndexManagement indexManagement, AbstractNotification notificationHandler) { + this.configuration = configuration; + this.indexManagement = indexManagement; + this.notificationHandler = notificationHandler; + this.notificationHandlerName = notificationHandler.getClass().getSimpleName(); + this.retryDestinationMS = configuration.getRetryDestinationMS(); + this.messageBatchSize = configuration.getMessageBatchSize(); + } + + public void run() { + this.source = configuration.getSourceName(); + + LOG.info("Publisher.run(source={}): starting publisher {}", this.source, notificationHandlerName); + + try { + IndexRecord record = null; + + while (true) { + waitIfDestinationDown(); + + if (this.isDrain) { + break; + } + + record = fetchNext(record); + + if (record != null && processAndDispatch(record)) { + indexManagement.removeAsDone(record); + + record = null; + } else { + indexManagement.rolloverSpoolFileIfNeeded(); + } + } + } catch (InterruptedException e) { + LOG.error("Publisher.run(source={}): {}: Publisher: Shutdown might be in progress!", this.source, notificationHandlerName); + } catch (Exception e) { + LOG.error("Publisher.run(source={}): {}: Publisher: Exception in destination writing!", this.source, notificationHandlerName, e); + } + + LOG.info("Publisher.run(source={}): publisher {} exited!", this.source, notificationHandlerName); + } + + public void setDestinationDown() { + this.isDestDown = true; + + this.indexManagement.updateFailedAttempt(); + } + + public void setDrain() { + this.isDrain = true; + } + + public boolean isDestinationDown() { + return isDestDown; + } + + private void waitIfDestinationDown() throws InterruptedException { + if (isDestDown) { + LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items", + this.source, notificationHandlerName, retryDestinationMS, indexManagement.getQueueSize()); + + Thread.sleep(retryDestinationMS); + } + + } + + private IndexRecord fetchNext(IndexRecord record) { + if (record == null) { + try { + record = indexManagement.next(); + } catch (Exception e) { + LOG.error("Publisher.fetchNext(source={}): failed!. publisher={}", this.source, notificationHandlerName, e); + } + } + + return record; + } + + @VisibleForTesting + boolean processAndDispatch(IndexRecord record) throws IOException { + boolean ret = true; + + if (SpoolUtils.fileExists(record)) { + FileLockedReadWrite fileLockedRead = new FileLockedReadWrite(source); + + try { + DataInput dataInput = fileLockedRead.getInput(new File(record.getPath())); + int lineInSpoolFile = 0; + List<String> messages = new ArrayList<>(); + + for (String message = dataInput.readLine(); message != null; message = dataInput.readLine()) { + lineInSpoolFile++; + + if (lineInSpoolFile < record.getLine()) { + continue; + } + + messages.add(message); + + if (messages.size() == messageBatchSize) { + dispatch(record, lineInSpoolFile, messages); + } + } + + dispatch(record, lineInSpoolFile, messages); + + LOG.info("Publisher.processAndDispatch(source={}): consumer={}: done reading file {}", this.source, notificationHandlerName, record.getPath()); + + ret = true; + } catch (OverlappingFileLockException ex) { + LOG.error("Publisher.processAndDispatch(source={}): consumer={}: some other process has locked this file {}", this.source, notificationHandlerName, record.getPath(), ex); + ret = false; + } catch (FileNotFoundException ex) { + LOG.error("Publisher.processAndDispatch(source={}): consumer={}: file not found {}", this.source, notificationHandlerName, record.getPath(), ex); + ret = true; + } catch (Exception ex) { + LOG.error("Publisher.processAndDispatch(source={}): consumer={}: failed for file {}", this.source, notificationHandlerName, record.getPath(), ex); + ret = false; + } finally { + fileLockedRead.close(); + } + } else { + LOG.error("Publisher.processAndDispatch(source={}): publisher={}: file '{}' not found!", this.source, notificationHandlerName, record.getPath()); + + ret = true; + } + + return ret; + } + + private void dispatch(IndexRecord record, int lineInSpoolFile, List<String> messages) throws Exception { + if (notificationHandler == null || messages == null || messages.size() == 0) { + LOG.error("Publisher.dispatch(source={}): consumer={}: error sending logs", this.source, notificationHandlerName); + } else { + dispatch(record.getPath(), messages); + + record.setCurrentLine(lineInSpoolFile); + indexManagement.update(record); + isDestDown = false; + } + } + + private void dispatch(String filePath, List<String> messages) throws Exception { + try { + notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, messages); + + if (isDestDown) { + LOG.info("Publisher.dispatch(source={}): consumer={}: destination is now up. file={}", this.source, notificationHandlerName, filePath); + } + } catch (Exception exception) { + setDestinationDown(); + + LOG.error("Publisher.dispatch(source={}): consumer={}: error while sending logs to consumer", this.source, notificationHandlerName, exception); + + throw new NotificationException(exception, String.format("%s: %s: Publisher: Destination down!", this.source, notificationHandlerName)); + } finally { + messages.clear(); + } + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java new file mode 100644 index 0000000..a9a3a78 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import org.apache.commons.configuration.Configuration; + +import java.io.File; + +public class SpoolConfiguration { + private static final int PROP_RETRY_DESTINATION_MS_DEFAULT = 30000; // Default 30 seconds + private static final int PROP_FILE_ROLLOVER_SEC_DEFAULT = 60; // 60 secs + private static final int PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT = 100; + private static final String PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT = "archive"; + private static final String PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT = "/tmp/spool"; + private static final int PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT = 100; + private static final String PROPERTY_PREFIX_SPOOL = "atlas.hook.spool."; + public static final String PROP_FILE_SPOOL_LOCAL_DIR = PROPERTY_PREFIX_SPOOL + "dir"; + private static final String PROP_FILE_SPOOL_ARCHIVE_DIR = PROPERTY_PREFIX_SPOOL + "archive.dir"; + private static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = PROPERTY_PREFIX_SPOOL + "archive.max.files"; + public static final String PROP_FILE_SPOOL_FILE_ROLLOVER_SEC = PROPERTY_PREFIX_SPOOL + "file.rollover.sec"; + public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = PROPERTY_PREFIX_SPOOL + "destination.retry.ms"; + private static final String PROP_MESSAGE_BATCH_SIZE = PROPERTY_PREFIX_SPOOL + "destination.message.batchsize"; + + private final String messageHandlerName; + private final int maxArchivedFilesCount; + private final int messageBatchSize; + private final int retryDestinationMS; + private final int fileRollOverSec; + private final int fileSpoolMaxFilesCount; + private final String spoolDirPath; + private final String archiveDir; + private String sourceName; + + public SpoolConfiguration(Configuration cfg, String messageHandlerName) { + this.messageHandlerName = messageHandlerName; + this.maxArchivedFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT); + this.messageBatchSize = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT); + this.retryDestinationMS = cfg.getInt(PROP_FILE_SPOOL_DEST_RETRY_MS, PROP_RETRY_DESTINATION_MS_DEFAULT); + this.fileRollOverSec = cfg.getInt(PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, PROP_FILE_ROLLOVER_SEC_DEFAULT) * 1000; + this.fileSpoolMaxFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT); + this.spoolDirPath = cfg.getString(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT); + this.archiveDir = cfg.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString()); + } + + public void setSource(String val) { + this.sourceName = val; + } + + public String getSourceName() { + return this.sourceName; + } + + public int getMaxArchiveFiles() { + return maxArchivedFilesCount; + } + + public int getRetryDestinationMS() { + return retryDestinationMS; + } + + public int getFileRolloverSec() { + return this.fileRollOverSec; + } + + public int getFileSpoolMaxFilesCount() { + return fileSpoolMaxFilesCount; + } + + public String getSpoolDirPath() { + return spoolDirPath; + } + + public File getSpoolDir() { + return new File(getSpoolDirPath()); + } + + public File getArchiveDir() { + return new File(this.archiveDir); + } + + public String getMessageHandlerName() { + return this.messageHandlerName; + } + + public int getMessageBatchSize() { + return messageBatchSize; + } + + public File getIndexFile() { + String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName()); + + return new File(getSpoolDir(), fileName); + } + + public File getIndexDoneFile() { + String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName()); + String fileDoneName = SpoolUtils.getIndexDoneFile(fileName); + + return new File(getSpoolDir(), fileDoneName); + } + + public File getIndexPublishFile() { + String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName()); + String fileDoneName = SpoolUtils.getIndexPublishFile(fileName); + + return new File(getSpoolDir(), fileDoneName); + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java new file mode 100644 index 0000000..abbe33d --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import org.apache.atlas.notification.spool.models.IndexRecord; +import org.apache.atlas.notification.spool.models.IndexRecords; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.text.SimpleDateFormat; + +public class SpoolUtils { + private static final Logger LOG = LoggerFactory.getLogger(SpoolUtils.class); + + public static final String DEFAULT_CHAR_SET = "UTF-8"; + private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator"); + private static final String FILE_EXT_JSON = ".json"; + public static final String FILE_EXT_LOG = ".log"; + private static final String SPOOL_FILE_NAME_FORMAT_PREFIX = "%s.%s%s"; + private static final String INDEX_FILE_CLOSED_SUFFIX = "_closed.json"; + private static final String INDEX_FILE_PUBLISH_SUFFIX = "_publish.json"; + private static final String INDEX_FILE_NAME_FORMAT = "index-%s-%s" + FILE_EXT_JSON; + private static final String SPOOL_FILE_NAME_FORMAT = "spool-%s-%s-%s" + FILE_EXT_LOG; + private static final String RECORD_EMPTY = StringUtils.leftPad(StringUtils.EMPTY, IndexRecord.RECORD_SIZE) + SpoolUtils.getLineSeparator(); + + public static File getCreateFile(File file, String source) throws IOException { + if (createFileIfNotExists(file, source)) { + LOG.info("SpoolUtils.getCreateFile(source={}): file={}", source, file.getAbsolutePath()); + } + + return file; + } + + public static boolean createFileIfNotExists(File file, String source) throws IOException { + boolean ret = file.exists(); + + if (!ret) { + ret = file.createNewFile(); + + if (!ret) { + LOG.error("SpoolUtils.createFileIfNotExists(source={}): error creating file {}", source, file.getPath()); + + ret = false; + } + } + + return ret; + } + + public static File getCreateDirectory(File file) { + File ret = file; + + if (!file.isDirectory()) { + boolean result = file.mkdirs(); + + if (!file.isDirectory() || !result) { + LOG.error("SpoolUtils.getCreateDirectory({}): inaccessible!", file.toString()); + + ret = null; + } + } + + return ret; + } + + public static PrintWriter createAppendPrintWriter(File filePath) throws UnsupportedEncodingException, FileNotFoundException { + return new PrintWriter( + new BufferedWriter( + new OutputStreamWriter( + new FileOutputStream(filePath, true), DEFAULT_CHAR_SET))); + } + + public static String getIndexFileName(String source, String handlerName) { + return String.format(SpoolUtils.INDEX_FILE_NAME_FORMAT, source, handlerName); + } + + public static String getIndexDoneFile(String filePath) { + return StringUtils.substringBeforeLast(filePath, SpoolUtils.FILE_EXT_JSON) + SpoolUtils.INDEX_FILE_CLOSED_SUFFIX; + } + + public static String getIndexPublishFile(String filePath) { + return StringUtils.substringBeforeLast(filePath, SpoolUtils.FILE_EXT_JSON) + SpoolUtils.INDEX_FILE_PUBLISH_SUFFIX; + } + + public static boolean fileExists(IndexRecord record) { + return record != null && new File(record.getPath()).exists(); + } + + static String getSpoolFileName(String source, String handlerName, String guid) { + return String.format(SPOOL_FILE_NAME_FORMAT, source, handlerName, guid); + } + + public static String getSpoolFilePath(SpoolConfiguration cfg, String spoolDir, String archiveFolder, String suffix) { + File ret = null; + String fileName = getSpoolFileName(cfg.getSourceName(), cfg.getMessageHandlerName(), suffix); + int lastDot = StringUtils.lastIndexOf(fileName, '.'); + String baseName = fileName.substring(0, lastDot); + String extension = fileName.substring(lastDot); + + for (int sequence = 1; true; sequence++) { + ret = new File(spoolDir, fileName); + + File archiveLogFile = new File(archiveFolder, fileName); + + if (!ret.exists() && !archiveLogFile.exists()) { + break; + } + + fileName = String.format(SPOOL_FILE_NAME_FORMAT_PREFIX, baseName, sequence, extension); + } + + return ret.getPath(); + } + + public static String getLineSeparator() { + return DEFAULT_LINE_SEPARATOR; + } + + public static String getRecordForWriting(IndexRecord record) { + String json = AtlasType.toJson(record); + + return StringUtils.rightPad(json, IndexRecord.RECORD_SIZE) + SpoolUtils.getLineSeparator(); + } + + public static String getEmptyRecordForWriting() { + return RECORD_EMPTY; + } + + public static IndexRecords createRecords(String[] items) { + IndexRecords records = new IndexRecords(); + + if (items != null && items.length > 0) { + try { + for (String item : items) { + if (StringUtils.isNotBlank(item)) { + IndexRecord record = AtlasType.fromJson(item, IndexRecord.class); + + records.getRecords().put(record.getId(), record); + } + } + } catch (Exception ex) { + LOG.error("SpoolUtils.createRecords(): error loading records.", ex); + } + } + + return records; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java new file mode 100644 index 0000000..2cacaaa --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.hook.FailedMessagesLogger; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutput; +import java.io.PrintWriter; +import java.util.List; + +public class Spooler extends AbstractNotification { + private static final Logger LOG = LoggerFactory.getLogger(Spooler.class); + + private final SpoolConfiguration configuration; + private final IndexManagement indexManagement; + private FailedMessagesLogger failedMessagesLogger; + private boolean isDrain; + + public Spooler(SpoolConfiguration configuration, IndexManagement indexManagement) { + this.configuration = configuration; + this.indexManagement = indexManagement; + } + + public void setFailedMessagesLogger(FailedMessagesLogger failedMessagesLogger) { + this.failedMessagesLogger = failedMessagesLogger; + } + + public void setDrain() { + this.isDrain = true; + } + + @Override + public <T> List<NotificationConsumer<T>> createConsumers(org.apache.atlas.notification.NotificationInterface.NotificationType notificationType, int numConsumers) { + return null; + } + + @Override + public void sendInternal(NotificationType type, List<String> messages) { + boolean ret = write(messages); + + if (failedMessagesLogger != null && !ret) { + writeToFailedMessages(messages); + } + } + + @Override + public void close() { + } + + @VisibleForTesting + boolean write(List<String> messages) { + final boolean ret; + + try { + if (!getDrain()) { + indexManagement.setSpoolWriteInProgress(); + + ret = writeInternal(messages); + } else { + LOG.error("Spooler.write(source={}): called after stop is called! Write will not be performed!", configuration.getSourceName(), messages); + + ret = false; + } + } finally { + indexManagement.resetSpoolWriteInProgress(); + } + + return ret; + } + + private void writeToFailedMessages(List<String> messages) { + if (failedMessagesLogger != null) { + for (String message : messages) { + failedMessagesLogger.log(message); + } + } + } + + private boolean writeInternal(List<String> messages) { + boolean ret = false; + + try { + byte[] lineSeparatorBytes = SpoolUtils.getLineSeparator().getBytes(SpoolUtils.DEFAULT_CHAR_SET); + DataOutput pw = indexManagement.getSpoolWriter(); + + for (String message : messages) { + pw.write(message.getBytes(SpoolUtils.DEFAULT_CHAR_SET)); + pw.write(lineSeparatorBytes); + } + + indexManagement.flushSpoolWriter(); + + ret = true; + } catch (Exception exception) { + LOG.error("Spooler.writeInternal(source={}): error writing to file. messages={}", configuration.getSourceName(), messages, exception); + + ret = false; + } + + return ret; + } + + private boolean getDrain() { + return this.isDrain; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecord.java b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecord.java new file mode 100644 index 0000000..21dad06 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecord.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool.models; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.lang.StringUtils; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.UUID; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class IndexRecord implements Serializable { + public static final int RECORD_SIZE = 500; + public static final String STATUS_PENDING = "PENDING"; + public static final String STATUS_WRITE_IN_PROGRESS = "WRITE_IN_PROGRESS"; + public static final String STATUS_READ_IN_PROGRESS = "READ_IN_PROGRESS"; + public static final String STATUS_DONE = "DONE"; + + private String id; + private String path; + private int line; + private long created; + private long writeCompleted; + private long doneCompleted; + private long lastSuccess; + private long lastFailed; + private boolean lastAttempt; + private int failedAttempt; + private String status; + + public IndexRecord() { + this.status = STATUS_WRITE_IN_PROGRESS; + this.lastAttempt = false; + } + + public IndexRecord(String path) { + this.id = UUID.randomUUID().toString(); + this.path = path; + this.failedAttempt = 0; + this.status = STATUS_WRITE_IN_PROGRESS; + this.created = System.currentTimeMillis(); + + setLastAttempt(false); + } + + @Override + public String toString() { + return "IndexRecord [id=" + id + ", filePath=" + path + + ", linePosition=" + line + ", status=" + status + + ", fileCreateTime=" + created + + ", writeCompleteTime=" + writeCompleted + + ", doneCompleteTime=" + doneCompleted + + ", lastSuccessTime=" + lastSuccess + + ", lastFailedTime=" + lastFailed + + ", failedAttemptCount=" + failedAttempt + + ", lastAttempt=" + lastAttempt + "]"; + } + + public void setId(String id) { + this.id = id; + } + + public String getId() { + return this.id; + } + + public void setPath(String path) { + this.path = path; + } + + public String getPath() { + return this.path; + } + + public void setLine(int line) { + this.line = line; + } + + public int getLine() { + return line; + } + + public void setCreated(long fileCreateTime) { + this.created = fileCreateTime; + } + + public long getCreated() { + return this.created; + } + + public void setWriteCompleted(long writeCompleted) { + this.writeCompleted = writeCompleted; + } + + public long getWriteCompleted() { + return this.writeCompleted; + } + + public void setDoneCompleted(long doneCompleted) { + this.doneCompleted = doneCompleted; + } + + public long getDoneCompleted() { + return doneCompleted; + } + + public void setLastSuccess(long lastSuccess) { + this.lastSuccess = lastSuccess; + } + + public long getLastSuccess() { + return lastSuccess; + } + + public void setLastFailed(long lastFailed) { + this.lastFailed = lastFailed; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getStatus() { + return this.status; + } + + public void setLastAttempt(boolean lastAttempt) { + this.lastAttempt = lastAttempt; + } + + public void setFailedAttempt(int failedAttempt) { + this.failedAttempt = failedAttempt; + } + + public int getFailedAttempt() { + return failedAttempt; + } + + @JsonIgnore + public void setDone() { + setStatus(IndexRecord.STATUS_DONE); + setDoneCompleted(System.currentTimeMillis()); + setLastAttempt(true); + } + + @JsonIgnore + public void setStatusPending() { + setStatus(IndexRecord.STATUS_PENDING); + setWriteCompleted(System.currentTimeMillis()); + setLastAttempt(true); + } + + @JsonIgnore + public void updateFailedAttempt() { + setLastFailed(System.currentTimeMillis()); + incrementFailedAttemptCount(); + setLastAttempt(false); + } + + @JsonIgnore + public boolean equals(IndexRecord record) { + return this.id.equals(record.getId()); + } + + @JsonIgnore + public void setCurrentLine(int line) { + setLine(line); + setStatus(STATUS_READ_IN_PROGRESS); + setLastSuccess(System.currentTimeMillis()); + setLastAttempt(true); + } + + @JsonIgnore + public boolean isStatusDone() { + return this.status.equals(STATUS_DONE); + } + + @JsonIgnore + public boolean isStatusWriteInProgress() { + return this.status.equals(STATUS_WRITE_IN_PROGRESS); + } + + @JsonIgnore + public boolean isStatusReadInProgress() { + return status.equals(IndexRecord.STATUS_READ_IN_PROGRESS); + } + + @JsonIgnore + public void incrementFailedAttemptCount() { + this.failedAttempt++; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecords.java b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecords.java new file mode 100644 index 0000000..abcb837 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecords.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool.models; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class IndexRecords implements Serializable { + private LinkedHashMap<String, IndexRecord> records; + + public IndexRecords() { + this.records = new LinkedHashMap<>(); + } + + public Map<String, IndexRecord> getRecords() { + return records; + } + + public void setRecords(LinkedHashMap<String, IndexRecord> records) { + this.records = records; + } + + @JsonIgnore + public int size() { + LinkedHashMap<String, IndexRecord> records = this.records; + + return records != null ? records.size() : 0; + } + + @JsonIgnore + public void remove(IndexRecord record) { + LinkedHashMap<String, IndexRecord> records = this.records; + + if (records != null) { + records.remove(record.getId()); + } + } + + @JsonIgnore + public void add(IndexRecord record) { + LinkedHashMap<String, IndexRecord> records = this.records; + + if (records == null) { + records = new LinkedHashMap<>(); + + this.records = records; + } + + records.put(record.getId(), record); + } + + @JsonIgnore + public void delete(IndexRecord record) { + remove(record); + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileLockedReadWrite.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileLockedReadWrite.java new file mode 100644 index 0000000..5d5ad8c --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileLockedReadWrite.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool.utils.local; + +import org.apache.commons.lang.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +public class FileLockedReadWrite extends FileOperation { + private RandomAccessFile raf; + private FileChannel channel; + private FileLock lock; + + public FileLockedReadWrite(String source) { + super(source); + } + + @Override + public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException { + this.raf = randomAccessFile; + this.channel = channel; + this.lock = channel.tryLock(); + + return lock; + } + + public DataInput getInput(File file) throws IOException { + return getRaf(file); + } + + public DataOutput getOutput(File file) throws IOException { + return getRaf(file); + } + + public void flush() throws IOException { + if (channel != null) { + channel.force(true); + } + } + + public void close() { + super.close(this.raf, this.channel, this.lock); + } + + private RandomAccessFile getRaf(File file) throws IOException { + RandomAccessFile raf = new RandomAccessFile(file, "rws"); + + run(raf, raf.getChannel(), StringUtils.EMPTY); + + return raf; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpAppend.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpAppend.java new file mode 100644 index 0000000..bfc113d --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpAppend.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool.utils.local; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +public class FileOpAppend extends FileOperation { + + public FileOpAppend(String source) { + super(source); + } + + @Override + public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException { + FileLock lock = channel.tryLock(randomAccessFile.length(), json.length(), false); + + channel.position(randomAccessFile.length()); + + randomAccessFile.writeBytes(json); + + return lock; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpCompaction.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpCompaction.java new file mode 100644 index 0000000..8240a80 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpCompaction.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool.utils.local; + +import org.apache.commons.lang.StringUtils; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +public class FileOpCompaction extends FileOperation { + private final FileOpRead fileOpLoad; + + public FileOpCompaction(String source) { + super(source); + + this.fileOpLoad = new FileOpRead(source); + } + + @Override + public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException { + FileLock lock = file.getChannel().tryLock(); + + fileOpLoad.perform(getFile(), StringUtils.EMPTY); + + file.getChannel().truncate(0); + + String[] rawItems = fileOpLoad.getItems(); + + if (rawItems != null) { + for (String record : rawItems) { + if (StringUtils.isNotBlank(record)) { + file.writeBytes(record); + } + } + } + + return lock; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpDelete.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpDelete.java new file mode 100644 index 0000000..19243a6 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpDelete.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool.utils.local; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +public class FileOpDelete extends FileOperation { + public FileOpDelete(String source) { + super(source); + } + + @Override + public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException { + final FileLock ret; + final long position = find(file, getId()); + + if (position < 0) { + ret = null; + } else { + ret = channel.tryLock(position, json.length(), false); + + channel.position(position); + + file.writeBytes(json); + } + + return ret; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpRead.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpRead.java new file mode 100644 index 0000000..b228090 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpRead.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool.utils.local; + +import org.apache.atlas.notification.spool.SpoolUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +public class FileOpRead extends FileOperation { + private static final Logger LOG = LoggerFactory.getLogger(FileOpRead.class); + + private String[] items; + + public FileOpRead(String source) { + super(source); + } + + @Override + public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException { + items = null; + + byte[] bytes = new byte[(int) randomAccessFile.length()]; + + randomAccessFile.readFully(bytes); + + int rawRecords = 0; + String allRecords = new String(bytes); + + if (StringUtils.isNotEmpty(allRecords)) { + items = StringUtils.split(allRecords, SpoolUtils.getLineSeparator()); + + if (items != null) { + rawRecords = items.length; + } + } + + LOG.info("FileOpRead.run(source={}): loaded file {}, raw records={}", this.getSource(), this.getFile().getAbsolutePath(), rawRecords); + + return null; + } + + public String[] getItems() { + return items; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpUpdate.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpUpdate.java new file mode 100644 index 0000000..cd58e86 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpUpdate.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool.utils.local; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +public class FileOpUpdate extends FileOperation { + private String id; + private final FileOpAppend fileOpAppend; + + public FileOpUpdate(String source, FileOpAppend fileOpAppend) { + super(source); + + this.fileOpAppend = fileOpAppend; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @Override + public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException { + final FileLock ret; + final long position = find(file, getId()); + + if (position < 0) { + ret = fileOpAppend.run(file, channel, json); + } else { + ret = channel.tryLock(position, json.length(), false); + + channel.position(position); + + file.writeBytes(json); + } + + return ret; + } +} diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOperation.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOperation.java new file mode 100644 index 0000000..e5bf9d2 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOperation.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool.utils.local; + +import org.apache.atlas.notification.spool.SpoolUtils; +import org.apache.atlas.notification.spool.models.IndexRecord; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.RandomUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.util.concurrent.TimeUnit; + +public abstract class FileOperation { + private static final Logger LOG = LoggerFactory.getLogger(FileOperation.class); + + private static final int MAX_RETRY_ATTEMPTS = 5; + private static final String RANDOM_ACCESS_FILE_OPEN_MODE_RWS = "rws"; + private static final String RANDOM_ACCESS_FILE_OPEN_MODE_R = "r"; + + private final String source; + private File file; + private String id; + + public static RandomAccessFile createRandomAccessFileForRead(File file) throws FileNotFoundException { + return new RandomAccessFile(file, RANDOM_ACCESS_FILE_OPEN_MODE_R); + } + + public static RandomAccessFile createRandomAccessFile(File file) throws FileNotFoundException { + return new RandomAccessFile(file, RANDOM_ACCESS_FILE_OPEN_MODE_RWS); + } + + public static long find(RandomAccessFile raf, String id) throws IOException { + while (true) { + String line = raf.readLine(); + + if (StringUtils.isEmpty(line)) { + break; + } + + if (line.contains(id)) { + return raf.getChannel().position() - SpoolUtils.getLineSeparator().length() - IndexRecord.RECORD_SIZE; + } + } + + return -1; + } + + public FileOperation(String source) { + this(source, false); + } + + public FileOperation(String source, boolean notifyConcurrency) { + this.source = source; + } + + public String getSource() { + return source; + } + + public void setId(String id) { + this.id = id; + } + + public void perform(File file) { + perform(file, StringUtils.EMPTY); + } + + public void perform(File file, String json) { + setFile(file); + + performWithRetry(file, json); + } + + public void perform(File file, String id, String json) { + this.setId(id); + perform(file, json); + } + + public abstract FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException; + + + protected File getFile() { + return this.file; + } + + protected String getId() { + return this.id; + } + + protected void close(RandomAccessFile randomAccessFile, FileChannel channel, FileLock lock) { + try { + if (channel != null) { + channel.force(true); + } + + if (lock != null) { + lock.release(); + } + + if (channel != null) { + channel.close(); + } + + if (randomAccessFile != null) { + randomAccessFile.close(); + } + } catch (IOException exception) { + LOG.error("FileOperation(source={}).close(): failed", getSource(), exception); + } + } + + + private void setFile(File file) { + this.file = file; + } + + private void performWithRetry(File file, String json) { + for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) { + try { + performOperation(json); + return; + } catch (OverlappingFileLockException e) { + try { + int timeout = 1 + (50 * RandomUtils.nextInt(10)); + LOG.info("FileOperation.performWithRetry(source={}): {}: {}: Waiting: {} ms...", getSource(), getClass().getSimpleName(), file.getName(), timeout); + + TimeUnit.MILLISECONDS.sleep(timeout); + } catch (InterruptedException ex) { + LOG.error("FileOperation.performWithRetry(source={}): {}: Interrupted!", getSource(), file.getAbsolutePath(), ex); + } + + LOG.info("FileOperation.performWithRetry(source={}): {}: Re-trying: {}!", getSource(), file.getAbsolutePath(), i); + } + } + + LOG.info("FileOperation.performWithRetry(source={}): {}: appendRecord: Could not write.", getSource(), file.getAbsolutePath()); + } + + private boolean performOperation(String json) { + RandomAccessFile randomAccessFile = null; + FileChannel channel = null; + FileLock lock = null; + + try { + randomAccessFile = new RandomAccessFile(getFile(), "rws"); + channel = randomAccessFile.getChannel(); + lock = run(randomAccessFile, channel, json); + } catch (FileNotFoundException e) { + LOG.error("FileOperation.performOperation(source={}): file={}: file not found", getSource(), getFile().getAbsolutePath(), e); + } catch (IOException exception) { + LOG.error("FileOperation.performOperation(source={}): file={}: failed", getSource(), getFile().getAbsolutePath()); + } finally { + close(randomAccessFile, channel, lock); + } + + return true; + } +} diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java index 94cb70d..d7e4959 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java @@ -112,7 +112,7 @@ public class AbstractNotificationTest { } @Override - protected void sendInternal(NotificationType notificationType, List<String> notificationMessages) + public void sendInternal(NotificationType notificationType, List<String> notificationMessages) throws NotificationException { type = notificationType; diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java new file mode 100644 index 0000000..167efbe --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.atlas.notification.NotificationInterface.NotificationType.HOOK; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class AtlasFileSpoolTest extends BaseTest { + private static int MAX_RECORDS = 50; + + private static class MessageHandlerSpy extends AbstractNotification { + + private List<String> publishedMessages = new ArrayList<>(); + + public List<String> getMessages() { + return publishedMessages; + } + + @Override + public void init(String source, Object failedMessagesLogger) { + } + + @Override + public void setCurrentUser(String user) { + + } + + @Override + public void sendInternal(NotificationType type, List<String> messages) throws NotificationException { + publishedMessages.addAll(messages); + + } + + @Override + public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) { + return null; + } + + @Override + public <T> void send(NotificationType type, T... messages) throws NotificationException { + } + + @Override + public <T> void send(NotificationType type, List<T> messages) throws NotificationException { + } + + @Override + public void close() { + + } + } + + @Test + public void indexSetupMultipleTimes() throws IOException, AtlasException { + SpoolConfiguration cfg = getSpoolConfiguration(); + IndexManagement indexManagement = new IndexManagement(cfg); + + for (int i = 0; i < 2; i++) { + indexManagement.init(); + assertTrue(cfg.getSpoolDir().exists()); + assertTrue(cfg.getArchiveDir().exists()); + + File indexFile = indexManagement.getIndexFileManager().getIndexFile(); + File indexDoneFile = indexManagement.getIndexFileManager().getDoneFile(); + + assertTrue(indexFile.exists(), "File not created: " + indexFile.getAbsolutePath()); + assertTrue(indexDoneFile.exists(), "File not created: " + indexDoneFile.getAbsolutePath()); + } + } + + @Test + public void spoolerTest() throws IOException, AtlasException { + SpoolConfiguration cfg = getSpoolConfigurationTest(); + IndexManagement indexManagement = new IndexManagement(cfg); + + indexManagement.init(); + Spooler spooler = new Spooler(cfg, indexManagement); + for (int i = 0; i < MAX_RECORDS; i++) { + spooler.write(Collections.singletonList("message: " + i)); + } + + indexManagement.stop(); + } + + @Test(dependsOnMethods = "spoolerTest") + public void publisherTest() throws IOException, AtlasException, InterruptedException { + SpoolConfiguration cfg = getSpoolConfigurationTest(); + + IndexManagement indexManagement = new IndexManagement(cfg); + indexManagement.init(); + + MessageHandlerSpy messageHandler = new MessageHandlerSpy(); + Publisher publisher = new Publisher(cfg, indexManagement, messageHandler); + boolean ret = publisher.processAndDispatch(indexManagement.getIndexFileManager().getRecords().get(0)); + + publisher.setDrain(); + Assert.assertTrue(ret); + TimeUnit.SECONDS.sleep(5); + assertTrue(messageHandler.getMessages().size() >= 0); + } + + @Test + public void indexRecordsRead() throws IOException, AtlasException { + SpoolConfiguration spoolCfg = getSpoolConfigurationTest(); + IndexManagement indexManagement = new IndexManagement(spoolCfg); + indexManagement.init(); + + } + + @Test + public void concurrentWriteAndPublish() throws InterruptedException, IOException, AtlasException { + final int MAX_PROCESSES = 4; + SpoolConfiguration spoolCfg = getSpoolConfigurationTest(5); + + IndexManagement[] im1 = new IndexManagement[MAX_PROCESSES]; + MessageHandlerSpy[] messageHandlerSpy = new MessageHandlerSpy[MAX_PROCESSES]; + + for (int i = 0; i < MAX_PROCESSES; i++) { + messageHandlerSpy[i] = new MessageHandlerSpy(); + im1[i] = new IndexManagement(spoolCfg); + } + + for (int i = 0; i < MAX_PROCESSES; i++) { + im1[i].init(); + } + + IndexManagement imVerify = new IndexManagement(spoolCfg); + imVerify.init(); + Assert.assertTrue(imVerify.getIndexFileManager().getRecords().size() >= 0); + + Thread[] th1 = new Thread[MAX_PROCESSES]; + for (int i = 0; i < MAX_PROCESSES; i++) { + th1[i] = new Thread(new MessagePump(new Spooler(spoolCfg, im1[i]), new Publisher(spoolCfg, im1[i], messageHandlerSpy[i]))); + } + + for (int i = 0; i < MAX_PROCESSES; i++) { + th1[i].start(); + } + + for (int i = 0; i < MAX_PROCESSES; i++) { + th1[i].join(); + } + + imVerify = new IndexManagement(spoolCfg); + imVerify.init(); + Assert.assertEquals(imVerify.getIndexFileManager().getRecords().size(), 0); + for (int i = 0; i < MAX_PROCESSES; i++) { + Assert.assertTrue(messageHandlerSpy[i].getMessages().size() >= 0); + } + } + + private class MessagePump implements Runnable { + + private Spooler spooler; + private Publisher publisher; + private Thread publisherThread; + + public MessagePump(Spooler spooler, Publisher publisher) { + this.spooler = spooler; + this.publisher = publisher; + } + + @Override + public void run() { + publisherThread = new Thread(publisher); + publisherThread.start(); + + for (int i = 0; i < MAX_RECORDS; i++) { + try { + spooler.send(HOOK, String.format("%s-%s", "message", i)); + + Thread.sleep(RandomUtils.nextInt(10, 100)); + } catch (NotificationException exception) { + exception.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + try { + Thread.sleep(10000); + publisher.setDrain(); + publisherThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + + @AfterClass + public void tearDown() { + FileUtils.deleteQuietly(new File(spoolDirTest)); + } +} diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java new file mode 100644 index 0000000..7ca745f --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; + +public class BaseTest { + public static String spoolDir = System.getProperty("user.dir") + "/src/test/resources/spool"; + public static String spoolDirTest = spoolDir + "-test"; + protected final String SOURCE_TEST = "test-src"; + protected final String SOURCE_TEST_HANDLER = "1"; + + protected final String knownIndexFilePath = "index-test-src-1.json"; + protected final String knownIndexDoneFilePath = "index-test-src-1_closed.json"; + + protected File archiveDir = new File(spoolDir, "archive"); + protected File indexFile = new File(spoolDir, knownIndexFilePath); + protected File indexDoneFile = new File(spoolDir, knownIndexDoneFilePath); + + public SpoolConfiguration getSpoolConfiguration() { + return getSpoolConfiguration(spoolDir, SOURCE_TEST_HANDLER); + } + + public SpoolConfiguration getSpoolConfigurationTest() { + return getSpoolConfiguration(spoolDirTest, SOURCE_TEST_HANDLER); + } + public SpoolConfiguration getSpoolConfigurationTest(Integer testId) { + return getSpoolConfiguration(spoolDirTest, testId.toString()); + } + + public SpoolConfiguration getSpoolConfiguration(String spoolDir, String handlerName) { + SpoolConfiguration cfg = new SpoolConfiguration(getConfiguration(spoolDir), handlerName); + cfg.setSource(SOURCE_TEST); + return cfg; + } + + public Configuration getConfiguration(String spoolDir) { + final int destinationRetry = 2000; + + PropertiesConfiguration props = new PropertiesConfiguration(); + props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, spoolDir); + props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_DEST_RETRY_MS, Integer.toString(destinationRetry)); + props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, Integer.toString(2)); + return props; + } + + protected File getNewIndexFile(char id) throws IOException { + File f = new File(spoolDirTest, knownIndexFilePath.replace('1', id)); + FileUtils.copyFile(indexFile, f); + return f; + } + + protected File getNewIndexDoneFile(char id) throws IOException { + File f = new File(spoolDirTest, knownIndexDoneFilePath.replace('1', id)); + FileUtils.copyFile(indexDoneFile, f); + return f; + } +} diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/IndexManagementTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/IndexManagementTest.java new file mode 100644 index 0000000..f9d2a06 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/spool/IndexManagementTest.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.spool; + +import org.apache.atlas.notification.spool.models.IndexRecord; +import org.apache.atlas.notification.spool.models.IndexRecords; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class IndexManagementTest extends BaseTest { + @Test + public void fileNameGeneration() { + String handlerName = "someHandler"; + SpoolConfiguration cfg = getSpoolConfiguration(spoolDir, handlerName); + + IndexRecord record = new IndexRecord(StringUtils.EMPTY); + Assert.assertEquals(SpoolUtils.getIndexFileName(cfg.getSourceName(), cfg.getMessageHandlerName()), "index-test-src-someHandler.json"); + Assert.assertTrue(SpoolUtils.getSpoolFileName(cfg.getSourceName(), cfg.getMessageHandlerName(), record.getId()).startsWith("spool-test-src-someHandler-")); + } + + @Test + public void verifyLoad() throws IOException { + final int expectedRecords = 2; + SpoolConfiguration cfg = getSpoolConfiguration(); + + IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, cfg.getIndexFile(), cfg.getIndexDoneFile(), null, 2); + + Assert.assertEquals(indexFileManager.getRecords().size(), expectedRecords); + + Assert.assertEquals(indexFileManager.getRecords().get(0).getId(), "1"); + Assert.assertEquals(indexFileManager.getRecords().get(1).getId(), "2"); + } + + @Test + public void addAndRemove() throws IOException { + File newIndexFile = getNewIndexFile('3'); + File newIndexDoneFile = getNewIndexDoneFile('3'); + + IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexFile, newIndexDoneFile, null, 2); + + int expectedCount = 2; + Assert.assertEquals(indexFileManager.getRecords().size(), expectedCount); + + IndexRecord r3 = indexFileManager.add("3.log"); + IndexRecord r4 = indexFileManager.add("4.log"); + IndexRecord r5 = indexFileManager.add("5.log"); + + r4.updateFailedAttempt(); + indexFileManager.updateIndex(r4); + + r5.setLine(100); + indexFileManager.updateIndex(r5); + + IndexRecords records = indexFileManager.loadRecords(newIndexFile); + Assert.assertTrue(records.getRecords().containsKey(r3.getId())); + Assert.assertTrue(records.getRecords().containsKey(r4.getId())); + Assert.assertTrue(records.getRecords().containsKey(r5.getId())); + + Assert.assertEquals(records.getRecords().get(r3.getId()).getStatus(), r3.getStatus()); + Assert.assertEquals(records.getRecords().get(r4.getId()).getFailedAttempt(), r4.getFailedAttempt()); + Assert.assertEquals(records.getRecords().get(r5.getId()).getLine(), r5.getLine()); + + indexFileManager.remove(r3); + indexFileManager.remove(r4); + indexFileManager.remove(r5); + + Assert.assertEquals(indexFileManager.getRecords().size(), expectedCount); + } + + @Test + public void verifyOperations() throws IOException { + SpoolConfiguration cfg = getSpoolConfigurationTest(); + + File newIndexFile = getNewIndexFile('2'); + File newIndexDoneFile = getNewIndexDoneFile('2'); + + File archiveDir = cfg.getArchiveDir(); + IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexFile, newIndexDoneFile, null, 2); + + verifyAdding(indexFileManager); + verifySaveAndLoad(indexFileManager); + verifyRemove(indexFileManager); + verifyRecords(indexFileManager); + + checkDoneFile(newIndexDoneFile, archiveDir, 2, "5.log"); + + verifyArchiving(indexFileManager); + } + + private void verifyRecords(IndexManagement.IndexFileManager indexFileManager) { + List<IndexRecord> records = indexFileManager.getRecords(); + + Assert.assertEquals(records.size(), 5); + Assert.assertTrue(records.get(3).getPath().endsWith("3.log")); + Assert.assertEquals(records.get(3).getStatus(), IndexRecord.STATUS_WRITE_IN_PROGRESS); + Assert.assertEquals(records.get(2).getFailedAttempt(), 0); + Assert.assertEquals(records.get(1).getDoneCompleted(), 0); + Assert.assertEquals(records.get(0).getLine(), 0); + Assert.assertFalse(records.get(0).getLastSuccess() != 0); + } + + private void verifyAdding(IndexManagement.IndexFileManager indexFileManager) throws IOException { + addFile(indexFileManager, spoolDirTest, "2.log"); + addFile(indexFileManager, spoolDirTest, "3.log"); + addFile(indexFileManager, spoolDirTest, "4.log"); + addFile(indexFileManager, spoolDirTest, "5.log"); + } + + private void verifyArchiving(IndexManagement.IndexFileManager indexFileManager) { + indexFileManager.remove(indexFileManager.getRecords().get(1)); + indexFileManager.remove(indexFileManager.getRecords().get(1)); + indexFileManager.remove(indexFileManager.getRecords().get(1)); + indexFileManager.remove(indexFileManager.getRecords().get(1)); + + checkArchiveDir(archiveDir); + } + + private void verifyRemove(IndexManagement.IndexFileManager indexFileManager) throws IOException { + indexFileManager.remove(indexFileManager.getRecords().get(5)); + + boolean isPending = indexFileManager.getRecords().size() > 0; + Assert.assertTrue(isPending); + } + + private void verifySaveAndLoad(IndexManagement.IndexFileManager indexFileManager) throws IOException { + indexFileManager.getRecords().get(2).updateFailedAttempt(); + indexFileManager.getRecords().get(3).setDone(); + indexFileManager.getRecords().get(1).setDoneCompleted(333l); + indexFileManager.getRecords().get(0).setCurrentLine(999); + + Assert.assertEquals(indexFileManager.getRecords().size(), 6); + } + + private void checkArchiveDir(File archiveDir) { + Set<String> availableFiles = new HashSet<>(); + availableFiles.add(new File(archiveDir, "3.log").toString()); + availableFiles.add(new File(archiveDir, "4.log").toString()); + + if (!archiveDir.exists()) { + return; + } + + File[] files = archiveDir.listFiles(); + Assert.assertNotNull(files); + Assert.assertEquals(files.length, 1); + } + + private void addFile(IndexManagement.IndexFileManager indexFileManager, String dir, String fileName) throws IOException { + File file = new File(dir, fileName); + file.createNewFile(); + indexFileManager.add(file.toString()); + } + + private void checkDoneFile(File newIndexDoneFile, File archiveDir, int maxArchiveFiles, String expectedFilePath) throws IOException { + IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexDoneFile, newIndexDoneFile, null, maxArchiveFiles); + + Assert.assertEquals(indexFileManager.getRecords().size(), 2); + Assert.assertTrue(indexFileManager.getRecords().get(1).getPath().endsWith(expectedFilePath)); + } + + @AfterClass + public void tearDown() { + FileUtils.deleteQuietly(new File(spoolDirTest)); + } +} diff --git a/notification/src/test/resources/spool/archive/spool-1.json b/notification/src/test/resources/spool/archive/spool-1.json new file mode 100644 index 0000000..bcbcecf --- /dev/null +++ b/notification/src/test/resources/spool/archive/spool-1.json @@ -0,0 +1,3 @@ +message-0 +message-1 +message-2 \ No newline at end of file diff --git a/notification/src/test/resources/spool/index-test-src-1.json b/notification/src/test/resources/spool/index-test-src-1.json new file mode 100644 index 0000000..d4cee87 --- /dev/null +++ b/notification/src/test/resources/spool/index-test-src-1.json @@ -0,0 +1,2 @@ +{"id":"1","path":"0.log","line":0,"created":123,"writeCompleted":888,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"PENDING"} +{"id":"2","path":"1.log","line":10,"created":456,"writeCompleted":0,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"WRITE_IN_PROGRESS"} diff --git a/notification/src/test/resources/spool/index-test-src-1_closed.json b/notification/src/test/resources/spool/index-test-src-1_closed.json new file mode 100644 index 0000000..1b8881e --- /dev/null +++ b/notification/src/test/resources/spool/index-test-src-1_closed.json @@ -0,0 +1 @@ +{"id":"x","path":"x.log","line":10,"created":456,"writeCompleted":0,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"DONE"} diff --git a/pom.xml b/pom.xml index b924201..91fd593 100644 --- a/pom.xml +++ b/pom.xml @@ -697,6 +697,7 @@ <javax.servlet.version>3.1.0</javax.servlet.version> <guava.version>25.1-jre</guava.version> <antlr4.version>4.7</antlr4.version> + <log4j.version>2.8</log4j.version> <!-- Needed for hooks --> <aopalliance.version>1.0</aopalliance.version>