Commit de87bc50 by Ashutosh Mestry

ATLAS-3427: Atlas Hook Enhancements for improved resiliancy.

parent 3f1cf185
...@@ -164,6 +164,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -164,6 +164,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public HiveHook() { public HiveHook() {
} }
public HiveHook(String name) {
super(name);
}
@Override @Override
public void run(HookContext hookContext) throws Exception { public void run(HookContext hookContext) throws Exception {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
......
...@@ -45,7 +45,7 @@ public class HiveMetastoreHookImpl extends MetaStoreEventListener { ...@@ -45,7 +45,7 @@ public class HiveMetastoreHookImpl extends MetaStoreEventListener {
public HiveMetastoreHookImpl(Configuration config) { public HiveMetastoreHookImpl(Configuration config) {
super(config); super(config);
this.hiveHook = new HiveHook(); this.hiveHook = new HiveHook(this.getClass().getSimpleName());
this.hook = new HiveMetastoreHook(); this.hook = new HiveMetastoreHook();
} }
......
...@@ -56,6 +56,18 @@ ...@@ -56,6 +56,18 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.binary.version}</artifactId> <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
</dependency> </dependency>
......
...@@ -126,6 +126,7 @@ public abstract class AtlasHook { ...@@ -126,6 +126,7 @@ public abstract class AtlasHook {
try { try {
LOG.info("==> Shutdown of Atlas Hook"); LOG.info("==> Shutdown of Atlas Hook");
notificationInterface.close();
executor.shutdown(); executor.shutdown();
executor.awaitTermination(SHUTDOWN_HOOK_WAIT_TIME_MS, TimeUnit.MILLISECONDS); executor.awaitTermination(SHUTDOWN_HOOK_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
executor = null; executor = null;
...@@ -141,6 +142,15 @@ public abstract class AtlasHook { ...@@ -141,6 +142,15 @@ public abstract class AtlasHook {
LOG.info("Created Atlas Hook"); LOG.info("Created Atlas Hook");
} }
public AtlasHook() {
notificationInterface.init(this.getClass().getSimpleName(), failedMessagesLogger);
}
public AtlasHook(String name) {
LOG.info("AtlasHook: Spool name: Passed from caller.: {}", name);
notificationInterface.init(name, failedMessagesLogger);
}
/** /**
* Notify atlas of the entity through message. The entity can be a * Notify atlas of the entity through message. The entity can be a
* complex entity with reference to other entities. * complex entity with reference to other entities.
......
...@@ -19,16 +19,14 @@ ...@@ -19,16 +19,14 @@
package org.apache.atlas.hook; package org.apache.atlas.hook;
import org.apache.log4j.Appender; import org.apache.atlas.notification.LogConfigUtils;
import org.apache.log4j.DailyRollingFileAppender; import org.apache.log4j.DailyRollingFileAppender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout; import org.apache.log4j.PatternLayout;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Enumeration;
/** /**
* A logger wrapper that can be used to write messages that failed to be sent to a log file. * A logger wrapper that can be used to write messages that failed to be sent to a log file.
...@@ -46,7 +44,7 @@ public class FailedMessagesLogger { ...@@ -46,7 +44,7 @@ public class FailedMessagesLogger {
} }
void init() { void init() {
String rootLoggerDirectory = getRootLoggerDirectory(); String rootLoggerDirectory = LogConfigUtils.getRootDir();
if (rootLoggerDirectory == null) { if (rootLoggerDirectory == null) {
return; return;
} }
...@@ -62,38 +60,7 @@ public class FailedMessagesLogger { ...@@ -62,38 +60,7 @@ public class FailedMessagesLogger {
} }
} }
/** public void log(String message) {
* Get the root logger file location under which the failed log messages will be written.
*
* Since this class is used in Hooks which run within JVMs of other components like Hive,
* we want to write the failed messages file under the same location as where logs from
* the host component are saved. This method attempts to get such a location from the
* root logger's appenders. It will work only if at least one of the appenders is a {@link FileAppender}
*
* @return directory under which host component's logs are stored.
*/
private String getRootLoggerDirectory() {
String rootLoggerDirectory = null;
Logger rootLogger = Logger.getRootLogger();
Enumeration allAppenders = rootLogger.getAllAppenders();
if (allAppenders != null) {
while (allAppenders.hasMoreElements()) {
Appender appender = (Appender) allAppenders.nextElement();
if (appender instanceof FileAppender) {
FileAppender fileAppender = (FileAppender) appender;
String rootLoggerFile = fileAppender.getFile();
rootLoggerDirectory = rootLoggerFile != null ? new File(rootLoggerFile).getParent() : null;
break;
}
}
}
return rootLoggerDirectory;
}
void log(String message) {
logger.error(message); logger.error(message);
} }
} }
...@@ -19,23 +19,59 @@ package org.apache.atlas.kafka; ...@@ -19,23 +19,59 @@ package org.apache.atlas.kafka;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.LogConfigUtils;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.spool.AtlasFileSpool;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
/** /**
* Provider class for Notification interfaces * Provider class for Notification interfaces
*/ */
public class NotificationProvider { public class NotificationProvider {
private static KafkaNotification kafka; private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class);
private static final String CONF_ATLAS_HOOK_SPOOL_ENABLED = "atlas.hook.spool.enabled";
private static final String CONF_ATLAS_HOOK_SPOOL_DIR = "atlas.hook.spool.dir";
private static final boolean CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT = false;
public static KafkaNotification get() { private static NotificationInterface notificationProvider;
if (kafka == null) {
public static NotificationInterface get() {
if (notificationProvider == null) {
try { try {
Configuration applicationProperties = ApplicationProperties.get(); Configuration conf = ApplicationProperties.get();
kafka = new KafkaNotification(applicationProperties); KafkaNotification kafka = new KafkaNotification(conf);
String spoolDir = getSpoolDir(conf);
if (isSpoolingEnabled(conf) && StringUtils.isNotEmpty(spoolDir)) {
LOG.info("Notification spooling is enabled: spool directory={}", spoolDir);
conf.setProperty(CONF_ATLAS_HOOK_SPOOL_DIR, spoolDir);
notificationProvider = new AtlasFileSpool(conf, kafka);
} else {
LOG.info("Notification spooling is not enabled");
notificationProvider = kafka;
}
} catch (AtlasException e) { } catch (AtlasException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
return kafka; return notificationProvider;
}
private static boolean isSpoolingEnabled(Configuration configuration) {
return configuration.getBoolean(CONF_ATLAS_HOOK_SPOOL_ENABLED, CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT);
}
private static String getSpoolDir(Configuration configuration) {
return configuration.getString(CONF_ATLAS_HOOK_SPOOL_DIR);
} }
} }
...@@ -76,6 +76,10 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -76,6 +76,10 @@ public abstract class AbstractNotification implements NotificationInterface {
protected AbstractNotification() { protected AbstractNotification() {
} }
@Override
public void init(String source, Object failedMessagesLogger) {
}
// ----- NotificationInterface ------------------------------------------- // ----- NotificationInterface -------------------------------------------
@Override @Override
...@@ -108,7 +112,7 @@ public abstract class AbstractNotification implements NotificationInterface { ...@@ -108,7 +112,7 @@ public abstract class AbstractNotification implements NotificationInterface {
* *
* @throws NotificationException if an error occurs while sending * @throws NotificationException if an error occurs while sending
*/ */
protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException; public abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;
// ----- utility methods ------------------------------------------------- // ----- utility methods -------------------------------------------------
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.FileAppender;
import org.apache.logging.log4j.core.appender.RollingFileAppender;
import org.apache.logging.log4j.core.appender.RollingRandomAccessFileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Enumeration;
public class LogConfigUtils {
private static final Logger LOG = LoggerFactory.getLogger(LogConfigUtils.class);
public static String getRootDir() {
String ret = getFileAppenderPath();
if (StringUtils.isEmpty(ret)) {
ret = getFileAppenderPathApproach2();
}
if (StringUtils.isNotEmpty(ret)) {
ret = StringUtils.substringBeforeLast(ret, File.separator);
} else {
ret = null;
}
LOG.info("getRootDir(): ret={}", ret);
return ret;
}
private static String getFileAppenderPath() {
String ret = StringUtils.EMPTY;
LoggerContext loggerContext = (LoggerContext) LogManager.getContext();
Configuration configuration = loggerContext.getConfiguration();
for (Appender appender : configuration.getAppenders().values()) {
if (appender instanceof RollingRandomAccessFileAppender) {
ret = ((RollingRandomAccessFileAppender) appender).getFileName();
break;
} else if (appender instanceof RollingFileAppender) {
ret = ((RollingRandomAccessFileAppender) appender).getFileName();
break;
} else if (appender instanceof FileAppender) {
ret = ((FileAppender) appender).getFileName();
break;
} else {
LOG.info("Could not infer log path from this appender: {}", appender.getClass().getName());
}
}
LOG.info("getFileAppenderPath(): ret={}", ret);
return ret;
}
private static String getFileAppenderPathApproach2() {
String ret = null;
try {
org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
Enumeration allAppenders = rootLogger.getAllAppenders();
if (allAppenders != null) {
while (allAppenders.hasMoreElements()) {
Object appender = allAppenders.nextElement();
if (appender instanceof org.apache.log4j.FileAppender) {
org.apache.log4j.FileAppender fileAppender = (org.apache.log4j.FileAppender) appender;
ret = fileAppender.getName();
break;
}
}
}
} catch (Exception e) {
LOG.error("getFileAppenderPathApproach2(): failed to get appender path", e);
}
LOG.info("getFileAppenderPathApproach2(): ret={}", ret);
return ret;
}
}
...@@ -31,6 +31,10 @@ public class NotificationException extends AtlasException { ...@@ -31,6 +31,10 @@ public class NotificationException extends AtlasException {
super(e); super(e);
} }
public NotificationException(Exception e, String errorMsg) {
super(errorMsg, e);
}
public NotificationException(Exception e, List<String> failedMessages) { public NotificationException(Exception e, List<String> failedMessages) {
super(e); super(e);
this.failedMessages = failedMessages; this.failedMessages = failedMessages;
......
...@@ -61,6 +61,14 @@ public interface NotificationInterface { ...@@ -61,6 +61,14 @@ public interface NotificationInterface {
/** /**
* *
* @param source: Name of the source
* @param failedMessagesLogger: Logger for failed messages
* @return
*/
void init(String source, Object failedMessagesLogger);
/**
*
* @param user Name of the user under which the processes is running * @param user Name of the user under which the processes is running
*/ */
void setCurrentUser(String user); void setCurrentUser(String user);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
public class Archiver {
private static final Logger LOG = LoggerFactory.getLogger(Archiver.class);
private final String source;
private final File indexDoneFile;
private final File archiveFolder;
private final int maxArchiveFiles;
public Archiver(String source, File indexDoneFile, File archiveFolder, int maxArchiveFiles) {
this.source = source;
this.indexDoneFile = indexDoneFile;
this.archiveFolder = archiveFolder;
this.maxArchiveFiles = maxArchiveFiles;
}
public void archive(IndexRecord indexRecord) {
moveToArchiveDir(indexRecord);
removeOldFiles();
}
private void moveToArchiveDir(IndexRecord indexRecord) {
File spoolFile = null;
File archiveFile = null;
try {
spoolFile = new File(indexRecord.getPath());
archiveFile = new File(archiveFolder, spoolFile.getName());
LOG.info("{}: moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile);
FileUtils.moveFile(spoolFile, archiveFile);
} catch (FileNotFoundException excp) {
LOG.warn("{}: failed while moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile, excp);
} catch (IOException excp) {
LOG.error("{}: failed while moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile, excp);
}
}
private void removeOldFiles() {
try {
File[] logFiles = archiveFolder == null ? null : archiveFolder.listFiles(pathname -> StringUtils.endsWithIgnoreCase(pathname.getName(), SpoolUtils.FILE_EXT_LOG));
int filesToDelete = logFiles == null ? 0 : logFiles.length - maxArchiveFiles;
if (filesToDelete > 0) {
try (BufferedReader br = new BufferedReader(new FileReader(indexDoneFile))) {
int filesDeletedCount = 0;
for (String line = br.readLine(); line != null; line = br.readLine()) {
line = line.trim();
if (StringUtils.isEmpty(line)) {
continue;
}
try {
IndexRecord record = AtlasType.fromJson(line, IndexRecord.class);
File logFile = new File(record.getPath());
String fileName = logFile.getName();
File archiveFile = new File(archiveFolder, fileName);
if (!archiveFile.exists()) {
LOG.warn("archive file does not exist: {}", archiveFile);
continue;
}
LOG.info("Deleting archive file: {}", archiveFile);
boolean ret = archiveFile.delete();
if (!ret) {
LOG.error("{}: Error deleting archive file. File: {}", source, archiveFile);
} else {
filesDeletedCount++;
}
if (filesDeletedCount >= filesToDelete) {
break;
}
} catch (Exception excp) {
LOG.error("{}: Error deleting older archive file in index-record: {}", source, line, excp);
}
}
LOG.info("{}: Deleted: {} archived files", source, filesDeletedCount);
}
}
} catch(Exception exception){
LOG.error("{}: Error deleting older files from archive folder. Folder: {}", source, archiveFolder, exception);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import org.apache.atlas.AtlasException;
import org.apache.atlas.hook.FailedMessagesLogger;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
public class AtlasFileSpool implements NotificationInterface {
private static final Logger LOG = LoggerFactory.getLogger(AtlasFileSpool.class);
private final AbstractNotification notificationHandler;
private final SpoolConfiguration config;
private final IndexManagement indexManagement;
private final Spooler spooler;
private final Publisher publisher;
private Thread publisherThread;
private Boolean initDone = null;
public AtlasFileSpool(Configuration configuration, AbstractNotification notificationHandler) {
this.notificationHandler = notificationHandler;
this.config = new SpoolConfiguration(configuration, notificationHandler.getClass().getSimpleName());
this.indexManagement = new IndexManagement(config);
this.spooler = new Spooler(config, indexManagement);
this.publisher = new Publisher(config, indexManagement, notificationHandler);
}
@Override
public void init(String source, Object failedMessagesLogger) {
LOG.info("==> AtlasFileSpool.init(source={})", source);
if (!isInitDone()) {
try {
config.setSource(source);
LOG.info("{}: Initialization: Starting...", this.config.getSourceName());
indexManagement.init();
if (failedMessagesLogger instanceof FailedMessagesLogger) {
this.spooler.setFailedMessagesLogger((FailedMessagesLogger) failedMessagesLogger);
}
startPublisher();
initDone = true;
} catch (AtlasException exception) {
LOG.error("AtlasFileSpool(source={}): initialization failed", this.config.getSourceName(), exception);
initDone = false;
} catch (Throwable t) {
LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", this.config.getSourceName(), t);
}
} else {
LOG.info("AtlasFileSpool.init(): initialization already done. initDone={}", initDone);
}
LOG.info("<== AtlasFileSpool.init(source={})", source);
}
@Override
public void setCurrentUser(String user) {
this.notificationHandler.setCurrentUser(user);
}
@Override
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
LOG.warn("AtlasFileSpool.createConsumers(): not implemented");
return null;
}
@Override
public <T> void send(NotificationType type, T... messages) throws NotificationException {
send(type, Arrays.asList(messages));
}
@Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
if (LOG.isDebugEnabled()) {
LOG.debug("AtlasFileSpool.send(): sending to spooler");
}
spooler.send(type, messages);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("AtlasFileSpool.send(): sending to notificationHandler");
}
try {
notificationHandler.send(type, messages);
} catch (Exception e) {
if (isInitDone()) {
LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", e);
publisher.setDestinationDown();
spooler.send(type, messages);
} else {
LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not yet initialized", e);
throw e;
}
}
}
}
@Override
public void close() {
try {
spooler.setDrain();
publisher.setDrain();
indexManagement.stop();
publisherThread.join();
} catch (InterruptedException e) {
LOG.error("Interrupted! source={}", this.config.getSourceName(), e);
}
}
private void startPublisher() {
publisherThread = new Thread(publisher);
publisherThread.setDaemon(true);
publisherThread.setContextClassLoader(this.getClass().getClassLoader());
publisherThread.start();
LOG.info("{}: publisher started!", this.config.getSourceName());
}
private boolean isInitDone() {
return this.initDone != null;
}
private boolean hasInitSucceeded() {
return this.initDone != null && this.initDone == true;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import org.apache.atlas.notification.spool.utils.local.FileOpAppend;
import org.apache.atlas.notification.spool.utils.local.FileOpCompaction;
import org.apache.atlas.notification.spool.utils.local.FileOpDelete;
import org.apache.atlas.notification.spool.utils.local.FileOpRead;
import org.apache.atlas.notification.spool.utils.local.FileOpUpdate;
import java.io.File;
public class FileOperations {
private final String emptyRecordJson;
private final FileOpAppend fileOpAppend;
private final FileOpRead fileOpLoad;
private final FileOpUpdate fileOpUpdate;
private final FileOpCompaction fileOpCompaction;
private final FileOpDelete fileOpDelete;
public FileOperations(String emptyRecordJson, String source) {
this.emptyRecordJson = emptyRecordJson;
this.fileOpAppend = new FileOpAppend(source);
this.fileOpLoad = new FileOpRead(source);
this.fileOpUpdate = new FileOpUpdate(source, fileOpAppend);
this.fileOpCompaction = new FileOpCompaction(source);
this.fileOpDelete = new FileOpDelete(source);
}
public String[] load(File file) {
fileOpLoad.perform(file);
return fileOpLoad.getItems();
}
public void delete(File file, String id) {
fileOpDelete.perform(file, id, emptyRecordJson);
}
public void append(File file, String json) {
fileOpAppend.perform(file, json);
}
public void compact(File file) {
fileOpCompaction.perform(file);
}
public void update(File file, String id, String json) {
fileOpUpdate.setId(id);
fileOpUpdate.perform(file, json);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.atlas.notification.spool.models.IndexRecords;
import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class IndexManagement {
private static final Logger LOG = LoggerFactory.getLogger(IndexManagement.class);
private static final int MAX_RETRY_ATTEMPTS = 3;
private final SpoolConfiguration config;
private IndexFileManager indexFileManager;
private IndexReader indexReader;
private IndexWriter indexWriter;
public IndexManagement(SpoolConfiguration config) {
this.config = config;
}
public void init() throws IOException, AtlasException {
String sourceName = config.getSourceName();
File spoolDir = SpoolUtils.getCreateDirectory(config.getSpoolDir());
if (spoolDir == null) {
throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, spoolDir.getAbsolutePath()));
}
File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir());
if (archiveDir == null) {
throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, archiveDir.getAbsolutePath()));
}
File indexFile = SpoolUtils.getCreateFile(config.getIndexFile(), sourceName);
if (indexFile == null) {
throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexFile.getAbsolutePath()));
}
File indexDoneFile = SpoolUtils.getCreateFile(config.getIndexDoneFile(), sourceName);
if (indexDoneFile == null) {
throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexDoneFile.getAbsolutePath()));
}
performInit(indexFile.getAbsolutePath(), sourceName);
}
@VisibleForTesting
void performInit(String indexFilePath, String source) {
try {
File spoolDir = config.getSpoolDir();
File archiveDir = config.getArchiveDir();
File indexFile = config.getIndexFile();
File indexDoneFile = config.getIndexDoneFile();
indexFileManager = new IndexFileManager(source, indexFile, indexDoneFile, archiveDir, config.getMaxArchiveFiles());
indexReader = new IndexReader(source, indexFileManager, config.getRetryDestinationMS());
indexWriter = new IndexWriter(source, config, indexFileManager, indexReader, spoolDir, archiveDir, config.getFileRolloverSec());
} catch (Exception e) {
LOG.error("{}: init: Failed! Error loading records from index file: {}", config.getSourceName(), indexFilePath);
}
}
public boolean isPending() {
return !indexReader.isEmpty() ||
(indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0);
}
public synchronized DataOutput getSpoolWriter() throws IOException {
return indexWriter.getCreateWriter();
}
public void setSpoolWriteInProgress() {
this.indexWriter.setFileWriteInProgress(true);
}
public void resetSpoolWriteInProgress() {
this.indexWriter.setFileWriteInProgress(false);
}
public void updateFailedAttempt() {
this.indexReader.updateFailedAttempt();
}
public IndexRecord next() throws InterruptedException {
return indexReader.next();
}
public int getQueueSize() {
return indexReader.size();
}
public void removeAsDone(IndexRecord indexRecord) {
this.indexReader.removeAsDone(indexRecord);
this.indexWriter.rolloverIfNeeded();
}
public void stop() {
indexWriter.stop();
}
public void rolloverSpoolFileIfNeeded() {
this.indexWriter.rolloverIfNeeded();
}
@VisibleForTesting
IndexFileManager getIndexFileManager() {
return this.indexFileManager;
}
public void update(IndexRecord record) {
this.indexFileManager.updateIndex(record);
}
public void flushSpoolWriter() throws IOException {
this.indexWriter.flushCurrent();
}
static class IndexWriter {
private final String source;
private final SpoolConfiguration config;
private final File spoolFolder;
private final File archiveFolder;
private final int rollOverTimeout;
private final IndexFileManager indexFileManager;
private final IndexReader indexReader;
private final FileLockedReadWrite fileLockedReadWrite;
private IndexRecord currentIndexRecord;
private DataOutput currentWriter;
private boolean fileWriteInProgress;
public IndexWriter(String source, SpoolConfiguration config, IndexFileManager indexFileManager,
IndexReader indexReader,
File spoolFolder, File archiveFolder, int rollOverTimeout) {
this.source = source;
this.config = config;
this.indexFileManager = indexFileManager;
this.indexReader = indexReader;
this.spoolFolder = spoolFolder;
this.archiveFolder = archiveFolder;
this.rollOverTimeout = rollOverTimeout;
this.fileLockedReadWrite = new FileLockedReadWrite(source);
setCurrent(indexFileManager.getFirstWriteInProgressRecord());
}
public void setCurrent(IndexRecord indexRecord) {
this.currentIndexRecord = indexRecord;
}
public IndexRecord getCurrent() {
return this.currentIndexRecord;
}
private void setCurrentWriter(File file) throws IOException {
this.currentWriter = fileLockedReadWrite.getOutput(file);
}
public synchronized DataOutput getWriter() {
return this.currentWriter;
}
public synchronized DataOutput getCreateWriter() throws IOException {
rolloverIfNeeded();
if (getCurrent() == null) {
IndexRecord record = new IndexRecord(StringUtils.EMPTY);
String filePath = SpoolUtils.getSpoolFilePath(config, spoolFolder.toString(), archiveFolder.toString(), record.getId());
record.setPath(filePath);
indexFileManager.appendToIndexFile(record);
setCurrent(record);
LOG.info("IndexWriter.getCreateWriter(source={}): Creating new spool file. File: {}", this.source, filePath);
setCurrentWriter(new File(filePath));
} else {
if (this.currentWriter == null) {
LOG.info("IndexWriter.getCreateWriter(source={}): Opening existing file for append: File: {}", this.source, currentIndexRecord.getPath());
setCurrentWriter(new File(currentIndexRecord.getPath()));
}
}
return currentWriter;
}
public synchronized void rolloverIfNeeded() {
if (currentWriter != null && shouldRolloverSpoolFile()) {
LOG.info("IndexWriter.rolloverIfNeeded(source={}): Rolling over. Closing File: {}", this.config.getSourceName(), currentIndexRecord.getPath());
fileLockedReadWrite.close();
currentWriter = null;
currentIndexRecord.setStatusPending();
indexFileManager.updateIndex(currentIndexRecord);
LOG.info("IndexWriter.rolloverIfNeeded(source={}): Adding file to queue. File: {}", this.config.getSourceName(), currentIndexRecord.getPath());
indexReader.addToPublishQueue(currentIndexRecord);
currentIndexRecord = null;
}
}
private boolean shouldRolloverSpoolFile() {
return currentIndexRecord != null &&
(System.currentTimeMillis() - currentIndexRecord.getCreated() > this.rollOverTimeout);
}
void flushCurrent() throws IOException {
DataOutput pw = getWriter();
if (pw != null) {
fileLockedReadWrite.flush();
}
}
public void setFileWriteInProgress(boolean val) {
this.fileWriteInProgress = val;
}
public boolean isWriteInProgress() {
return this.fileWriteInProgress;
}
public void stop() {
LOG.info("==> IndexWriter.stop(source={})", this.config.getSourceName());
try {
DataOutput out = getWriter();
if (out != null) {
flushCurrent();
for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
if (isWriteInProgress()) {
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
LOG.error("IndexWriter.stop(source={}): Interrupted!", this.config.getSourceName(), e);
break;
}
continue;
}
LOG.info("IndexWriter.stop(source={}): Closing open file.", this.config.getSourceName());
fileLockedReadWrite.close();
currentIndexRecord.setStatusPending();
indexFileManager.updateIndex(currentIndexRecord);
break;
}
}
} catch (FileNotFoundException e) {
LOG.error("IndexWriter.stop(source={}): File not found! {}", this.config.getSourceName(), getCurrent().getPath(), e);
} catch (IOException exception) {
LOG.error("IndexWriter.stop(source={}): Error accessing file: {}", this.config.getSourceName(), getCurrent().getPath(), exception);
} catch (Exception exception) {
LOG.error("IndexWriter.stop(source={}): Error closing spool file.", this.config.getSourceName(), exception);
}
LOG.info("<== IndexWriter.stop(source={})", this.config.getSourceName());
}
}
static class IndexReader {
private final String source;
private final BlockingQueue<IndexRecord> blockingQueue;
private final IndexFileManager indexFileManager;
private final long retryDestinationMS;
private IndexRecord currentIndexRecord;
public IndexReader(String source, IndexFileManager indexFileManager, long retryDestinationMS) {
this.source = source;
this.blockingQueue = new LinkedBlockingQueue<>();
this.retryDestinationMS = retryDestinationMS;
this.indexFileManager = indexFileManager;
List<IndexRecord> records = indexFileManager.getRecords();
records.stream().forEach(x -> addIfStatus(x, IndexRecord.STATUS_READ_IN_PROGRESS));
records.stream().forEach(x -> addIfStatus(x, IndexRecord.STATUS_PENDING));
}
private void addIfStatus(IndexRecord record, String status) {
if (record != null && record.getStatus().equals(status)) {
if (!SpoolUtils.fileExists(record)) {
LOG.error("IndexReader.addIfStatus(source={}): file {} not found!", this.source, record.getPath());
} else {
addToPublishQueue(record);
}
}
}
public void addToPublishQueue(IndexRecord record) {
try {
if (!blockingQueue.contains(record)) {
blockingQueue.add(record);
}
} catch (OverlappingFileLockException lockException) {
LOG.warn("{}: {}: Someone else has locked the file.", source, record.getPath());
}
}
public IndexRecord next() throws InterruptedException {
this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS);
return this.currentIndexRecord;
}
public int size() {
return blockingQueue.size();
}
public boolean isEmpty() {
return blockingQueue.isEmpty();
}
public void updateFailedAttempt() {
if (currentIndexRecord != null) {
currentIndexRecord.updateFailedAttempt();
indexFileManager.updateIndex(currentIndexRecord);
}
}
public void removeAsDone(IndexRecord indexRecord) {
indexRecord.setDone();
indexFileManager.remove(indexRecord);
}
}
static class IndexFileManager {
private final String source;
private final File indexDoneFile;
private final File indexFile;
private final Archiver archiver;
private final FileOperations fileOperations;
public IndexFileManager(String source, File indexFile, File indexDoneFile, File archiveFolder, int maxArchiveFiles) {
this.source = source;
this.indexFile = indexFile;
this.indexDoneFile = indexDoneFile;
this.archiver = new Archiver(source, indexDoneFile, archiveFolder, maxArchiveFiles);
this.fileOperations = new FileOperations(SpoolUtils.getEmptyRecordForWriting(), source);
}
public List<IndexRecord> getRecords() {
return new ArrayList<>(loadRecords(indexFile).getRecords().values());
}
public synchronized void delete(File file, String id) {
fileOperations.delete(file, id);
}
public synchronized IndexRecord getFirstWriteInProgressRecord() {
IndexRecord ret = null;
IndexRecords records = loadRecords(indexFile);
if (records != null) {
for (IndexRecord record : records.getRecords().values()) {
if (record.isStatusWriteInProgress()) {
LOG.info("IndexFileManager.getFirstWriteInProgressRecord(source={}): current file={}", this.source, record.getPath());
ret = record;
break;
}
}
}
return ret;
}
public synchronized void remove(IndexRecord record) {
delete(indexFile, record.getId());
appendToDoneFile(record);
IndexRecords records = loadRecords(indexFile);
if (records.size() == 0) {
LOG.info("IndexFileManager.remove(source={}): All done!", this.source);
compactFile(indexFile);
}
}
public void appendToIndexFile(IndexRecord record) {
fileOperations.append(indexFile, SpoolUtils.getRecordForWriting(record));
}
public void updateIndex(IndexRecord record) {
fileOperations.update(indexFile, record.getId(), SpoolUtils.getRecordForWriting(record));
}
private void compactFile(File file) {
LOG.info("IndexFileManager.compactFile(source={}): compacting file {}", source, file.getAbsolutePath());
try {
fileOperations.compact(file);
} finally {
LOG.info("IndexFileManager.compactFile(source={}): done compacting file {}", source, file.getAbsolutePath());
}
}
private void appendToDoneFile(IndexRecord indexRecord) {
String json = SpoolUtils.getRecordForWriting(indexRecord);
fileOperations.append(indexDoneFile, json);
archiver.archive(indexRecord);
}
@VisibleForTesting
IndexRecords loadRecords(File file) {
String[] items = fileOperations.load(file);
return SpoolUtils.createRecords(items);
}
@VisibleForTesting
File getDoneFile() {
return this.indexDoneFile;
}
@VisibleForTesting
File getIndexFile() {
return this.indexFile;
}
@VisibleForTesting
IndexRecord add(String path) {
IndexRecord record = new IndexRecord(path);
appendToIndexFile(record);
return record;
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.List;
public class Publisher implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
private final SpoolConfiguration configuration;
private final IndexManagement indexManagement;
private final AbstractNotification notificationHandler;
private final String notificationHandlerName;
private final int retryDestinationMS;
private final int messageBatchSize;
private String source;
private boolean isDrain;
private boolean isDestDown;
public Publisher(SpoolConfiguration configuration, IndexManagement indexManagement, AbstractNotification notificationHandler) {
this.configuration = configuration;
this.indexManagement = indexManagement;
this.notificationHandler = notificationHandler;
this.notificationHandlerName = notificationHandler.getClass().getSimpleName();
this.retryDestinationMS = configuration.getRetryDestinationMS();
this.messageBatchSize = configuration.getMessageBatchSize();
}
public void run() {
this.source = configuration.getSourceName();
LOG.info("Publisher.run(source={}): starting publisher {}", this.source, notificationHandlerName);
try {
IndexRecord record = null;
while (true) {
waitIfDestinationDown();
if (this.isDrain) {
break;
}
record = fetchNext(record);
if (record != null && processAndDispatch(record)) {
indexManagement.removeAsDone(record);
record = null;
} else {
indexManagement.rolloverSpoolFileIfNeeded();
}
}
} catch (InterruptedException e) {
LOG.error("Publisher.run(source={}): {}: Publisher: Shutdown might be in progress!", this.source, notificationHandlerName);
} catch (Exception e) {
LOG.error("Publisher.run(source={}): {}: Publisher: Exception in destination writing!", this.source, notificationHandlerName, e);
}
LOG.info("Publisher.run(source={}): publisher {} exited!", this.source, notificationHandlerName);
}
public void setDestinationDown() {
this.isDestDown = true;
this.indexManagement.updateFailedAttempt();
}
public void setDrain() {
this.isDrain = true;
}
public boolean isDestinationDown() {
return isDestDown;
}
private void waitIfDestinationDown() throws InterruptedException {
if (isDestDown) {
LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items",
this.source, notificationHandlerName, retryDestinationMS, indexManagement.getQueueSize());
Thread.sleep(retryDestinationMS);
}
}
private IndexRecord fetchNext(IndexRecord record) {
if (record == null) {
try {
record = indexManagement.next();
} catch (Exception e) {
LOG.error("Publisher.fetchNext(source={}): failed!. publisher={}", this.source, notificationHandlerName, e);
}
}
return record;
}
@VisibleForTesting
boolean processAndDispatch(IndexRecord record) throws IOException {
boolean ret = true;
if (SpoolUtils.fileExists(record)) {
FileLockedReadWrite fileLockedRead = new FileLockedReadWrite(source);
try {
DataInput dataInput = fileLockedRead.getInput(new File(record.getPath()));
int lineInSpoolFile = 0;
List<String> messages = new ArrayList<>();
for (String message = dataInput.readLine(); message != null; message = dataInput.readLine()) {
lineInSpoolFile++;
if (lineInSpoolFile < record.getLine()) {
continue;
}
messages.add(message);
if (messages.size() == messageBatchSize) {
dispatch(record, lineInSpoolFile, messages);
}
}
dispatch(record, lineInSpoolFile, messages);
LOG.info("Publisher.processAndDispatch(source={}): consumer={}: done reading file {}", this.source, notificationHandlerName, record.getPath());
ret = true;
} catch (OverlappingFileLockException ex) {
LOG.error("Publisher.processAndDispatch(source={}): consumer={}: some other process has locked this file {}", this.source, notificationHandlerName, record.getPath(), ex);
ret = false;
} catch (FileNotFoundException ex) {
LOG.error("Publisher.processAndDispatch(source={}): consumer={}: file not found {}", this.source, notificationHandlerName, record.getPath(), ex);
ret = true;
} catch (Exception ex) {
LOG.error("Publisher.processAndDispatch(source={}): consumer={}: failed for file {}", this.source, notificationHandlerName, record.getPath(), ex);
ret = false;
} finally {
fileLockedRead.close();
}
} else {
LOG.error("Publisher.processAndDispatch(source={}): publisher={}: file '{}' not found!", this.source, notificationHandlerName, record.getPath());
ret = true;
}
return ret;
}
private void dispatch(IndexRecord record, int lineInSpoolFile, List<String> messages) throws Exception {
if (notificationHandler == null || messages == null || messages.size() == 0) {
LOG.error("Publisher.dispatch(source={}): consumer={}: error sending logs", this.source, notificationHandlerName);
} else {
dispatch(record.getPath(), messages);
record.setCurrentLine(lineInSpoolFile);
indexManagement.update(record);
isDestDown = false;
}
}
private void dispatch(String filePath, List<String> messages) throws Exception {
try {
notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, messages);
if (isDestDown) {
LOG.info("Publisher.dispatch(source={}): consumer={}: destination is now up. file={}", this.source, notificationHandlerName, filePath);
}
} catch (Exception exception) {
setDestinationDown();
LOG.error("Publisher.dispatch(source={}): consumer={}: error while sending logs to consumer", this.source, notificationHandlerName, exception);
throw new NotificationException(exception, String.format("%s: %s: Publisher: Destination down!", this.source, notificationHandlerName));
} finally {
messages.clear();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import org.apache.commons.configuration.Configuration;
import java.io.File;
public class SpoolConfiguration {
private static final int PROP_RETRY_DESTINATION_MS_DEFAULT = 30000; // Default 30 seconds
private static final int PROP_FILE_ROLLOVER_SEC_DEFAULT = 60; // 60 secs
private static final int PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT = 100;
private static final String PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT = "archive";
private static final String PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT = "/tmp/spool";
private static final int PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT = 100;
private static final String PROPERTY_PREFIX_SPOOL = "atlas.hook.spool.";
public static final String PROP_FILE_SPOOL_LOCAL_DIR = PROPERTY_PREFIX_SPOOL + "dir";
private static final String PROP_FILE_SPOOL_ARCHIVE_DIR = PROPERTY_PREFIX_SPOOL + "archive.dir";
private static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = PROPERTY_PREFIX_SPOOL + "archive.max.files";
public static final String PROP_FILE_SPOOL_FILE_ROLLOVER_SEC = PROPERTY_PREFIX_SPOOL + "file.rollover.sec";
public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = PROPERTY_PREFIX_SPOOL + "destination.retry.ms";
private static final String PROP_MESSAGE_BATCH_SIZE = PROPERTY_PREFIX_SPOOL + "destination.message.batchsize";
private final String messageHandlerName;
private final int maxArchivedFilesCount;
private final int messageBatchSize;
private final int retryDestinationMS;
private final int fileRollOverSec;
private final int fileSpoolMaxFilesCount;
private final String spoolDirPath;
private final String archiveDir;
private String sourceName;
public SpoolConfiguration(Configuration cfg, String messageHandlerName) {
this.messageHandlerName = messageHandlerName;
this.maxArchivedFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
this.messageBatchSize = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT);
this.retryDestinationMS = cfg.getInt(PROP_FILE_SPOOL_DEST_RETRY_MS, PROP_RETRY_DESTINATION_MS_DEFAULT);
this.fileRollOverSec = cfg.getInt(PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, PROP_FILE_ROLLOVER_SEC_DEFAULT) * 1000;
this.fileSpoolMaxFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
this.spoolDirPath = cfg.getString(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT);
this.archiveDir = cfg.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString());
}
public void setSource(String val) {
this.sourceName = val;
}
public String getSourceName() {
return this.sourceName;
}
public int getMaxArchiveFiles() {
return maxArchivedFilesCount;
}
public int getRetryDestinationMS() {
return retryDestinationMS;
}
public int getFileRolloverSec() {
return this.fileRollOverSec;
}
public int getFileSpoolMaxFilesCount() {
return fileSpoolMaxFilesCount;
}
public String getSpoolDirPath() {
return spoolDirPath;
}
public File getSpoolDir() {
return new File(getSpoolDirPath());
}
public File getArchiveDir() {
return new File(this.archiveDir);
}
public String getMessageHandlerName() {
return this.messageHandlerName;
}
public int getMessageBatchSize() {
return messageBatchSize;
}
public File getIndexFile() {
String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName());
return new File(getSpoolDir(), fileName);
}
public File getIndexDoneFile() {
String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName());
String fileDoneName = SpoolUtils.getIndexDoneFile(fileName);
return new File(getSpoolDir(), fileDoneName);
}
public File getIndexPublishFile() {
String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName());
String fileDoneName = SpoolUtils.getIndexPublishFile(fileName);
return new File(getSpoolDir(), fileDoneName);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.atlas.notification.spool.models.IndexRecords;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
public class SpoolUtils {
private static final Logger LOG = LoggerFactory.getLogger(SpoolUtils.class);
public static final String DEFAULT_CHAR_SET = "UTF-8";
private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
private static final String FILE_EXT_JSON = ".json";
public static final String FILE_EXT_LOG = ".log";
private static final String SPOOL_FILE_NAME_FORMAT_PREFIX = "%s.%s%s";
private static final String INDEX_FILE_CLOSED_SUFFIX = "_closed.json";
private static final String INDEX_FILE_PUBLISH_SUFFIX = "_publish.json";
private static final String INDEX_FILE_NAME_FORMAT = "index-%s-%s" + FILE_EXT_JSON;
private static final String SPOOL_FILE_NAME_FORMAT = "spool-%s-%s-%s" + FILE_EXT_LOG;
private static final String RECORD_EMPTY = StringUtils.leftPad(StringUtils.EMPTY, IndexRecord.RECORD_SIZE) + SpoolUtils.getLineSeparator();
public static File getCreateFile(File file, String source) throws IOException {
if (createFileIfNotExists(file, source)) {
LOG.info("SpoolUtils.getCreateFile(source={}): file={}", source, file.getAbsolutePath());
}
return file;
}
public static boolean createFileIfNotExists(File file, String source) throws IOException {
boolean ret = file.exists();
if (!ret) {
ret = file.createNewFile();
if (!ret) {
LOG.error("SpoolUtils.createFileIfNotExists(source={}): error creating file {}", source, file.getPath());
ret = false;
}
}
return ret;
}
public static File getCreateDirectory(File file) {
File ret = file;
if (!file.isDirectory()) {
boolean result = file.mkdirs();
if (!file.isDirectory() || !result) {
LOG.error("SpoolUtils.getCreateDirectory({}): inaccessible!", file.toString());
ret = null;
}
}
return ret;
}
public static PrintWriter createAppendPrintWriter(File filePath) throws UnsupportedEncodingException, FileNotFoundException {
return new PrintWriter(
new BufferedWriter(
new OutputStreamWriter(
new FileOutputStream(filePath, true), DEFAULT_CHAR_SET)));
}
public static String getIndexFileName(String source, String handlerName) {
return String.format(SpoolUtils.INDEX_FILE_NAME_FORMAT, source, handlerName);
}
public static String getIndexDoneFile(String filePath) {
return StringUtils.substringBeforeLast(filePath, SpoolUtils.FILE_EXT_JSON) + SpoolUtils.INDEX_FILE_CLOSED_SUFFIX;
}
public static String getIndexPublishFile(String filePath) {
return StringUtils.substringBeforeLast(filePath, SpoolUtils.FILE_EXT_JSON) + SpoolUtils.INDEX_FILE_PUBLISH_SUFFIX;
}
public static boolean fileExists(IndexRecord record) {
return record != null && new File(record.getPath()).exists();
}
static String getSpoolFileName(String source, String handlerName, String guid) {
return String.format(SPOOL_FILE_NAME_FORMAT, source, handlerName, guid);
}
public static String getSpoolFilePath(SpoolConfiguration cfg, String spoolDir, String archiveFolder, String suffix) {
File ret = null;
String fileName = getSpoolFileName(cfg.getSourceName(), cfg.getMessageHandlerName(), suffix);
int lastDot = StringUtils.lastIndexOf(fileName, '.');
String baseName = fileName.substring(0, lastDot);
String extension = fileName.substring(lastDot);
for (int sequence = 1; true; sequence++) {
ret = new File(spoolDir, fileName);
File archiveLogFile = new File(archiveFolder, fileName);
if (!ret.exists() && !archiveLogFile.exists()) {
break;
}
fileName = String.format(SPOOL_FILE_NAME_FORMAT_PREFIX, baseName, sequence, extension);
}
return ret.getPath();
}
public static String getLineSeparator() {
return DEFAULT_LINE_SEPARATOR;
}
public static String getRecordForWriting(IndexRecord record) {
String json = AtlasType.toJson(record);
return StringUtils.rightPad(json, IndexRecord.RECORD_SIZE) + SpoolUtils.getLineSeparator();
}
public static String getEmptyRecordForWriting() {
return RECORD_EMPTY;
}
public static IndexRecords createRecords(String[] items) {
IndexRecords records = new IndexRecords();
if (items != null && items.length > 0) {
try {
for (String item : items) {
if (StringUtils.isNotBlank(item)) {
IndexRecord record = AtlasType.fromJson(item, IndexRecord.class);
records.getRecords().put(record.getId(), record);
}
}
} catch (Exception ex) {
LOG.error("SpoolUtils.createRecords(): error loading records.", ex);
}
}
return records;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.hook.FailedMessagesLogger;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutput;
import java.io.PrintWriter;
import java.util.List;
public class Spooler extends AbstractNotification {
private static final Logger LOG = LoggerFactory.getLogger(Spooler.class);
private final SpoolConfiguration configuration;
private final IndexManagement indexManagement;
private FailedMessagesLogger failedMessagesLogger;
private boolean isDrain;
public Spooler(SpoolConfiguration configuration, IndexManagement indexManagement) {
this.configuration = configuration;
this.indexManagement = indexManagement;
}
public void setFailedMessagesLogger(FailedMessagesLogger failedMessagesLogger) {
this.failedMessagesLogger = failedMessagesLogger;
}
public void setDrain() {
this.isDrain = true;
}
@Override
public <T> List<NotificationConsumer<T>> createConsumers(org.apache.atlas.notification.NotificationInterface.NotificationType notificationType, int numConsumers) {
return null;
}
@Override
public void sendInternal(NotificationType type, List<String> messages) {
boolean ret = write(messages);
if (failedMessagesLogger != null && !ret) {
writeToFailedMessages(messages);
}
}
@Override
public void close() {
}
@VisibleForTesting
boolean write(List<String> messages) {
final boolean ret;
try {
if (!getDrain()) {
indexManagement.setSpoolWriteInProgress();
ret = writeInternal(messages);
} else {
LOG.error("Spooler.write(source={}): called after stop is called! Write will not be performed!", configuration.getSourceName(), messages);
ret = false;
}
} finally {
indexManagement.resetSpoolWriteInProgress();
}
return ret;
}
private void writeToFailedMessages(List<String> messages) {
if (failedMessagesLogger != null) {
for (String message : messages) {
failedMessagesLogger.log(message);
}
}
}
private boolean writeInternal(List<String> messages) {
boolean ret = false;
try {
byte[] lineSeparatorBytes = SpoolUtils.getLineSeparator().getBytes(SpoolUtils.DEFAULT_CHAR_SET);
DataOutput pw = indexManagement.getSpoolWriter();
for (String message : messages) {
pw.write(message.getBytes(SpoolUtils.DEFAULT_CHAR_SET));
pw.write(lineSeparatorBytes);
}
indexManagement.flushSpoolWriter();
ret = true;
} catch (Exception exception) {
LOG.error("Spooler.writeInternal(source={}): error writing to file. messages={}", configuration.getSourceName(), messages, exception);
ret = false;
}
return ret;
}
private boolean getDrain() {
return this.isDrain;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool.models;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.UUID;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class IndexRecord implements Serializable {
public static final int RECORD_SIZE = 500;
public static final String STATUS_PENDING = "PENDING";
public static final String STATUS_WRITE_IN_PROGRESS = "WRITE_IN_PROGRESS";
public static final String STATUS_READ_IN_PROGRESS = "READ_IN_PROGRESS";
public static final String STATUS_DONE = "DONE";
private String id;
private String path;
private int line;
private long created;
private long writeCompleted;
private long doneCompleted;
private long lastSuccess;
private long lastFailed;
private boolean lastAttempt;
private int failedAttempt;
private String status;
public IndexRecord() {
this.status = STATUS_WRITE_IN_PROGRESS;
this.lastAttempt = false;
}
public IndexRecord(String path) {
this.id = UUID.randomUUID().toString();
this.path = path;
this.failedAttempt = 0;
this.status = STATUS_WRITE_IN_PROGRESS;
this.created = System.currentTimeMillis();
setLastAttempt(false);
}
@Override
public String toString() {
return "IndexRecord [id=" + id + ", filePath=" + path
+ ", linePosition=" + line + ", status=" + status
+ ", fileCreateTime=" + created
+ ", writeCompleteTime=" + writeCompleted
+ ", doneCompleteTime=" + doneCompleted
+ ", lastSuccessTime=" + lastSuccess
+ ", lastFailedTime=" + lastFailed
+ ", failedAttemptCount=" + failedAttempt
+ ", lastAttempt=" + lastAttempt + "]";
}
public void setId(String id) {
this.id = id;
}
public String getId() {
return this.id;
}
public void setPath(String path) {
this.path = path;
}
public String getPath() {
return this.path;
}
public void setLine(int line) {
this.line = line;
}
public int getLine() {
return line;
}
public void setCreated(long fileCreateTime) {
this.created = fileCreateTime;
}
public long getCreated() {
return this.created;
}
public void setWriteCompleted(long writeCompleted) {
this.writeCompleted = writeCompleted;
}
public long getWriteCompleted() {
return this.writeCompleted;
}
public void setDoneCompleted(long doneCompleted) {
this.doneCompleted = doneCompleted;
}
public long getDoneCompleted() {
return doneCompleted;
}
public void setLastSuccess(long lastSuccess) {
this.lastSuccess = lastSuccess;
}
public long getLastSuccess() {
return lastSuccess;
}
public void setLastFailed(long lastFailed) {
this.lastFailed = lastFailed;
}
public void setStatus(String status) {
this.status = status;
}
public String getStatus() {
return this.status;
}
public void setLastAttempt(boolean lastAttempt) {
this.lastAttempt = lastAttempt;
}
public void setFailedAttempt(int failedAttempt) {
this.failedAttempt = failedAttempt;
}
public int getFailedAttempt() {
return failedAttempt;
}
@JsonIgnore
public void setDone() {
setStatus(IndexRecord.STATUS_DONE);
setDoneCompleted(System.currentTimeMillis());
setLastAttempt(true);
}
@JsonIgnore
public void setStatusPending() {
setStatus(IndexRecord.STATUS_PENDING);
setWriteCompleted(System.currentTimeMillis());
setLastAttempt(true);
}
@JsonIgnore
public void updateFailedAttempt() {
setLastFailed(System.currentTimeMillis());
incrementFailedAttemptCount();
setLastAttempt(false);
}
@JsonIgnore
public boolean equals(IndexRecord record) {
return this.id.equals(record.getId());
}
@JsonIgnore
public void setCurrentLine(int line) {
setLine(line);
setStatus(STATUS_READ_IN_PROGRESS);
setLastSuccess(System.currentTimeMillis());
setLastAttempt(true);
}
@JsonIgnore
public boolean isStatusDone() {
return this.status.equals(STATUS_DONE);
}
@JsonIgnore
public boolean isStatusWriteInProgress() {
return this.status.equals(STATUS_WRITE_IN_PROGRESS);
}
@JsonIgnore
public boolean isStatusReadInProgress() {
return status.equals(IndexRecord.STATUS_READ_IN_PROGRESS);
}
@JsonIgnore
public void incrementFailedAttemptCount() {
this.failedAttempt++;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool.models;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class IndexRecords implements Serializable {
private LinkedHashMap<String, IndexRecord> records;
public IndexRecords() {
this.records = new LinkedHashMap<>();
}
public Map<String, IndexRecord> getRecords() {
return records;
}
public void setRecords(LinkedHashMap<String, IndexRecord> records) {
this.records = records;
}
@JsonIgnore
public int size() {
LinkedHashMap<String, IndexRecord> records = this.records;
return records != null ? records.size() : 0;
}
@JsonIgnore
public void remove(IndexRecord record) {
LinkedHashMap<String, IndexRecord> records = this.records;
if (records != null) {
records.remove(record.getId());
}
}
@JsonIgnore
public void add(IndexRecord record) {
LinkedHashMap<String, IndexRecord> records = this.records;
if (records == null) {
records = new LinkedHashMap<>();
this.records = records;
}
records.put(record.getId(), record);
}
@JsonIgnore
public void delete(IndexRecord record) {
remove(record);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool.utils.local;
import org.apache.commons.lang.StringUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
public class FileLockedReadWrite extends FileOperation {
private RandomAccessFile raf;
private FileChannel channel;
private FileLock lock;
public FileLockedReadWrite(String source) {
super(source);
}
@Override
public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException {
this.raf = randomAccessFile;
this.channel = channel;
this.lock = channel.tryLock();
return lock;
}
public DataInput getInput(File file) throws IOException {
return getRaf(file);
}
public DataOutput getOutput(File file) throws IOException {
return getRaf(file);
}
public void flush() throws IOException {
if (channel != null) {
channel.force(true);
}
}
public void close() {
super.close(this.raf, this.channel, this.lock);
}
private RandomAccessFile getRaf(File file) throws IOException {
RandomAccessFile raf = new RandomAccessFile(file, "rws");
run(raf, raf.getChannel(), StringUtils.EMPTY);
return raf;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool.utils.local;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
public class FileOpAppend extends FileOperation {
public FileOpAppend(String source) {
super(source);
}
@Override
public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException {
FileLock lock = channel.tryLock(randomAccessFile.length(), json.length(), false);
channel.position(randomAccessFile.length());
randomAccessFile.writeBytes(json);
return lock;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool.utils.local;
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
public class FileOpCompaction extends FileOperation {
private final FileOpRead fileOpLoad;
public FileOpCompaction(String source) {
super(source);
this.fileOpLoad = new FileOpRead(source);
}
@Override
public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException {
FileLock lock = file.getChannel().tryLock();
fileOpLoad.perform(getFile(), StringUtils.EMPTY);
file.getChannel().truncate(0);
String[] rawItems = fileOpLoad.getItems();
if (rawItems != null) {
for (String record : rawItems) {
if (StringUtils.isNotBlank(record)) {
file.writeBytes(record);
}
}
}
return lock;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool.utils.local;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
public class FileOpDelete extends FileOperation {
public FileOpDelete(String source) {
super(source);
}
@Override
public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException {
final FileLock ret;
final long position = find(file, getId());
if (position < 0) {
ret = null;
} else {
ret = channel.tryLock(position, json.length(), false);
channel.position(position);
file.writeBytes(json);
}
return ret;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool.utils.local;
import org.apache.atlas.notification.spool.SpoolUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
public class FileOpRead extends FileOperation {
private static final Logger LOG = LoggerFactory.getLogger(FileOpRead.class);
private String[] items;
public FileOpRead(String source) {
super(source);
}
@Override
public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException {
items = null;
byte[] bytes = new byte[(int) randomAccessFile.length()];
randomAccessFile.readFully(bytes);
int rawRecords = 0;
String allRecords = new String(bytes);
if (StringUtils.isNotEmpty(allRecords)) {
items = StringUtils.split(allRecords, SpoolUtils.getLineSeparator());
if (items != null) {
rawRecords = items.length;
}
}
LOG.info("FileOpRead.run(source={}): loaded file {}, raw records={}", this.getSource(), this.getFile().getAbsolutePath(), rawRecords);
return null;
}
public String[] getItems() {
return items;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool.utils.local;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
public class FileOpUpdate extends FileOperation {
private String id;
private final FileOpAppend fileOpAppend;
public FileOpUpdate(String source, FileOpAppend fileOpAppend) {
super(source);
this.fileOpAppend = fileOpAppend;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@Override
public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException {
final FileLock ret;
final long position = find(file, getId());
if (position < 0) {
ret = fileOpAppend.run(file, channel, json);
} else {
ret = channel.tryLock(position, json.length(), false);
channel.position(position);
file.writeBytes(json);
}
return ret;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool.utils.local;
import org.apache.atlas.notification.spool.SpoolUtils;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.concurrent.TimeUnit;
public abstract class FileOperation {
private static final Logger LOG = LoggerFactory.getLogger(FileOperation.class);
private static final int MAX_RETRY_ATTEMPTS = 5;
private static final String RANDOM_ACCESS_FILE_OPEN_MODE_RWS = "rws";
private static final String RANDOM_ACCESS_FILE_OPEN_MODE_R = "r";
private final String source;
private File file;
private String id;
public static RandomAccessFile createRandomAccessFileForRead(File file) throws FileNotFoundException {
return new RandomAccessFile(file, RANDOM_ACCESS_FILE_OPEN_MODE_R);
}
public static RandomAccessFile createRandomAccessFile(File file) throws FileNotFoundException {
return new RandomAccessFile(file, RANDOM_ACCESS_FILE_OPEN_MODE_RWS);
}
public static long find(RandomAccessFile raf, String id) throws IOException {
while (true) {
String line = raf.readLine();
if (StringUtils.isEmpty(line)) {
break;
}
if (line.contains(id)) {
return raf.getChannel().position() - SpoolUtils.getLineSeparator().length() - IndexRecord.RECORD_SIZE;
}
}
return -1;
}
public FileOperation(String source) {
this(source, false);
}
public FileOperation(String source, boolean notifyConcurrency) {
this.source = source;
}
public String getSource() {
return source;
}
public void setId(String id) {
this.id = id;
}
public void perform(File file) {
perform(file, StringUtils.EMPTY);
}
public void perform(File file, String json) {
setFile(file);
performWithRetry(file, json);
}
public void perform(File file, String id, String json) {
this.setId(id);
perform(file, json);
}
public abstract FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException;
protected File getFile() {
return this.file;
}
protected String getId() {
return this.id;
}
protected void close(RandomAccessFile randomAccessFile, FileChannel channel, FileLock lock) {
try {
if (channel != null) {
channel.force(true);
}
if (lock != null) {
lock.release();
}
if (channel != null) {
channel.close();
}
if (randomAccessFile != null) {
randomAccessFile.close();
}
} catch (IOException exception) {
LOG.error("FileOperation(source={}).close(): failed", getSource(), exception);
}
}
private void setFile(File file) {
this.file = file;
}
private void performWithRetry(File file, String json) {
for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
try {
performOperation(json);
return;
} catch (OverlappingFileLockException e) {
try {
int timeout = 1 + (50 * RandomUtils.nextInt(10));
LOG.info("FileOperation.performWithRetry(source={}): {}: {}: Waiting: {} ms...", getSource(), getClass().getSimpleName(), file.getName(), timeout);
TimeUnit.MILLISECONDS.sleep(timeout);
} catch (InterruptedException ex) {
LOG.error("FileOperation.performWithRetry(source={}): {}: Interrupted!", getSource(), file.getAbsolutePath(), ex);
}
LOG.info("FileOperation.performWithRetry(source={}): {}: Re-trying: {}!", getSource(), file.getAbsolutePath(), i);
}
}
LOG.info("FileOperation.performWithRetry(source={}): {}: appendRecord: Could not write.", getSource(), file.getAbsolutePath());
}
private boolean performOperation(String json) {
RandomAccessFile randomAccessFile = null;
FileChannel channel = null;
FileLock lock = null;
try {
randomAccessFile = new RandomAccessFile(getFile(), "rws");
channel = randomAccessFile.getChannel();
lock = run(randomAccessFile, channel, json);
} catch (FileNotFoundException e) {
LOG.error("FileOperation.performOperation(source={}): file={}: file not found", getSource(), getFile().getAbsolutePath(), e);
} catch (IOException exception) {
LOG.error("FileOperation.performOperation(source={}): file={}: failed", getSource(), getFile().getAbsolutePath());
} finally {
close(randomAccessFile, channel, lock);
}
return true;
}
}
...@@ -112,7 +112,7 @@ public class AbstractNotificationTest { ...@@ -112,7 +112,7 @@ public class AbstractNotificationTest {
} }
@Override @Override
protected void sendInternal(NotificationType notificationType, List<String> notificationMessages) public void sendInternal(NotificationType notificationType, List<String> notificationMessages)
throws NotificationException { throws NotificationException {
type = notificationType; type = notificationType;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.atlas.notification.NotificationInterface.NotificationType.HOOK;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
public class AtlasFileSpoolTest extends BaseTest {
private static int MAX_RECORDS = 50;
private static class MessageHandlerSpy extends AbstractNotification {
private List<String> publishedMessages = new ArrayList<>();
public List<String> getMessages() {
return publishedMessages;
}
@Override
public void init(String source, Object failedMessagesLogger) {
}
@Override
public void setCurrentUser(String user) {
}
@Override
public void sendInternal(NotificationType type, List<String> messages) throws NotificationException {
publishedMessages.addAll(messages);
}
@Override
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
return null;
}
@Override
public <T> void send(NotificationType type, T... messages) throws NotificationException {
}
@Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
}
@Override
public void close() {
}
}
@Test
public void indexSetupMultipleTimes() throws IOException, AtlasException {
SpoolConfiguration cfg = getSpoolConfiguration();
IndexManagement indexManagement = new IndexManagement(cfg);
for (int i = 0; i < 2; i++) {
indexManagement.init();
assertTrue(cfg.getSpoolDir().exists());
assertTrue(cfg.getArchiveDir().exists());
File indexFile = indexManagement.getIndexFileManager().getIndexFile();
File indexDoneFile = indexManagement.getIndexFileManager().getDoneFile();
assertTrue(indexFile.exists(), "File not created: " + indexFile.getAbsolutePath());
assertTrue(indexDoneFile.exists(), "File not created: " + indexDoneFile.getAbsolutePath());
}
}
@Test
public void spoolerTest() throws IOException, AtlasException {
SpoolConfiguration cfg = getSpoolConfigurationTest();
IndexManagement indexManagement = new IndexManagement(cfg);
indexManagement.init();
Spooler spooler = new Spooler(cfg, indexManagement);
for (int i = 0; i < MAX_RECORDS; i++) {
spooler.write(Collections.singletonList("message: " + i));
}
indexManagement.stop();
}
@Test(dependsOnMethods = "spoolerTest")
public void publisherTest() throws IOException, AtlasException, InterruptedException {
SpoolConfiguration cfg = getSpoolConfigurationTest();
IndexManagement indexManagement = new IndexManagement(cfg);
indexManagement.init();
MessageHandlerSpy messageHandler = new MessageHandlerSpy();
Publisher publisher = new Publisher(cfg, indexManagement, messageHandler);
boolean ret = publisher.processAndDispatch(indexManagement.getIndexFileManager().getRecords().get(0));
publisher.setDrain();
Assert.assertTrue(ret);
TimeUnit.SECONDS.sleep(5);
assertTrue(messageHandler.getMessages().size() >= 0);
}
@Test
public void indexRecordsRead() throws IOException, AtlasException {
SpoolConfiguration spoolCfg = getSpoolConfigurationTest();
IndexManagement indexManagement = new IndexManagement(spoolCfg);
indexManagement.init();
}
@Test
public void concurrentWriteAndPublish() throws InterruptedException, IOException, AtlasException {
final int MAX_PROCESSES = 4;
SpoolConfiguration spoolCfg = getSpoolConfigurationTest(5);
IndexManagement[] im1 = new IndexManagement[MAX_PROCESSES];
MessageHandlerSpy[] messageHandlerSpy = new MessageHandlerSpy[MAX_PROCESSES];
for (int i = 0; i < MAX_PROCESSES; i++) {
messageHandlerSpy[i] = new MessageHandlerSpy();
im1[i] = new IndexManagement(spoolCfg);
}
for (int i = 0; i < MAX_PROCESSES; i++) {
im1[i].init();
}
IndexManagement imVerify = new IndexManagement(spoolCfg);
imVerify.init();
Assert.assertTrue(imVerify.getIndexFileManager().getRecords().size() >= 0);
Thread[] th1 = new Thread[MAX_PROCESSES];
for (int i = 0; i < MAX_PROCESSES; i++) {
th1[i] = new Thread(new MessagePump(new Spooler(spoolCfg, im1[i]), new Publisher(spoolCfg, im1[i], messageHandlerSpy[i])));
}
for (int i = 0; i < MAX_PROCESSES; i++) {
th1[i].start();
}
for (int i = 0; i < MAX_PROCESSES; i++) {
th1[i].join();
}
imVerify = new IndexManagement(spoolCfg);
imVerify.init();
Assert.assertEquals(imVerify.getIndexFileManager().getRecords().size(), 0);
for (int i = 0; i < MAX_PROCESSES; i++) {
Assert.assertTrue(messageHandlerSpy[i].getMessages().size() >= 0);
}
}
private class MessagePump implements Runnable {
private Spooler spooler;
private Publisher publisher;
private Thread publisherThread;
public MessagePump(Spooler spooler, Publisher publisher) {
this.spooler = spooler;
this.publisher = publisher;
}
@Override
public void run() {
publisherThread = new Thread(publisher);
publisherThread.start();
for (int i = 0; i < MAX_RECORDS; i++) {
try {
spooler.send(HOOK, String.format("%s-%s", "message", i));
Thread.sleep(RandomUtils.nextInt(10, 100));
} catch (NotificationException exception) {
exception.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(10000);
publisher.setDrain();
publisherThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@AfterClass
public void tearDown() {
FileUtils.deleteQuietly(new File(spoolDirTest));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
public class BaseTest {
public static String spoolDir = System.getProperty("user.dir") + "/src/test/resources/spool";
public static String spoolDirTest = spoolDir + "-test";
protected final String SOURCE_TEST = "test-src";
protected final String SOURCE_TEST_HANDLER = "1";
protected final String knownIndexFilePath = "index-test-src-1.json";
protected final String knownIndexDoneFilePath = "index-test-src-1_closed.json";
protected File archiveDir = new File(spoolDir, "archive");
protected File indexFile = new File(spoolDir, knownIndexFilePath);
protected File indexDoneFile = new File(spoolDir, knownIndexDoneFilePath);
public SpoolConfiguration getSpoolConfiguration() {
return getSpoolConfiguration(spoolDir, SOURCE_TEST_HANDLER);
}
public SpoolConfiguration getSpoolConfigurationTest() {
return getSpoolConfiguration(spoolDirTest, SOURCE_TEST_HANDLER);
}
public SpoolConfiguration getSpoolConfigurationTest(Integer testId) {
return getSpoolConfiguration(spoolDirTest, testId.toString());
}
public SpoolConfiguration getSpoolConfiguration(String spoolDir, String handlerName) {
SpoolConfiguration cfg = new SpoolConfiguration(getConfiguration(spoolDir), handlerName);
cfg.setSource(SOURCE_TEST);
return cfg;
}
public Configuration getConfiguration(String spoolDir) {
final int destinationRetry = 2000;
PropertiesConfiguration props = new PropertiesConfiguration();
props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, spoolDir);
props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_DEST_RETRY_MS, Integer.toString(destinationRetry));
props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, Integer.toString(2));
return props;
}
protected File getNewIndexFile(char id) throws IOException {
File f = new File(spoolDirTest, knownIndexFilePath.replace('1', id));
FileUtils.copyFile(indexFile, f);
return f;
}
protected File getNewIndexDoneFile(char id) throws IOException {
File f = new File(spoolDirTest, knownIndexDoneFilePath.replace('1', id));
FileUtils.copyFile(indexDoneFile, f);
return f;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.spool;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.atlas.notification.spool.models.IndexRecords;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class IndexManagementTest extends BaseTest {
@Test
public void fileNameGeneration() {
String handlerName = "someHandler";
SpoolConfiguration cfg = getSpoolConfiguration(spoolDir, handlerName);
IndexRecord record = new IndexRecord(StringUtils.EMPTY);
Assert.assertEquals(SpoolUtils.getIndexFileName(cfg.getSourceName(), cfg.getMessageHandlerName()), "index-test-src-someHandler.json");
Assert.assertTrue(SpoolUtils.getSpoolFileName(cfg.getSourceName(), cfg.getMessageHandlerName(), record.getId()).startsWith("spool-test-src-someHandler-"));
}
@Test
public void verifyLoad() throws IOException {
final int expectedRecords = 2;
SpoolConfiguration cfg = getSpoolConfiguration();
IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, cfg.getIndexFile(), cfg.getIndexDoneFile(), null, 2);
Assert.assertEquals(indexFileManager.getRecords().size(), expectedRecords);
Assert.assertEquals(indexFileManager.getRecords().get(0).getId(), "1");
Assert.assertEquals(indexFileManager.getRecords().get(1).getId(), "2");
}
@Test
public void addAndRemove() throws IOException {
File newIndexFile = getNewIndexFile('3');
File newIndexDoneFile = getNewIndexDoneFile('3');
IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexFile, newIndexDoneFile, null, 2);
int expectedCount = 2;
Assert.assertEquals(indexFileManager.getRecords().size(), expectedCount);
IndexRecord r3 = indexFileManager.add("3.log");
IndexRecord r4 = indexFileManager.add("4.log");
IndexRecord r5 = indexFileManager.add("5.log");
r4.updateFailedAttempt();
indexFileManager.updateIndex(r4);
r5.setLine(100);
indexFileManager.updateIndex(r5);
IndexRecords records = indexFileManager.loadRecords(newIndexFile);
Assert.assertTrue(records.getRecords().containsKey(r3.getId()));
Assert.assertTrue(records.getRecords().containsKey(r4.getId()));
Assert.assertTrue(records.getRecords().containsKey(r5.getId()));
Assert.assertEquals(records.getRecords().get(r3.getId()).getStatus(), r3.getStatus());
Assert.assertEquals(records.getRecords().get(r4.getId()).getFailedAttempt(), r4.getFailedAttempt());
Assert.assertEquals(records.getRecords().get(r5.getId()).getLine(), r5.getLine());
indexFileManager.remove(r3);
indexFileManager.remove(r4);
indexFileManager.remove(r5);
Assert.assertEquals(indexFileManager.getRecords().size(), expectedCount);
}
@Test
public void verifyOperations() throws IOException {
SpoolConfiguration cfg = getSpoolConfigurationTest();
File newIndexFile = getNewIndexFile('2');
File newIndexDoneFile = getNewIndexDoneFile('2');
File archiveDir = cfg.getArchiveDir();
IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexFile, newIndexDoneFile, null, 2);
verifyAdding(indexFileManager);
verifySaveAndLoad(indexFileManager);
verifyRemove(indexFileManager);
verifyRecords(indexFileManager);
checkDoneFile(newIndexDoneFile, archiveDir, 2, "5.log");
verifyArchiving(indexFileManager);
}
private void verifyRecords(IndexManagement.IndexFileManager indexFileManager) {
List<IndexRecord> records = indexFileManager.getRecords();
Assert.assertEquals(records.size(), 5);
Assert.assertTrue(records.get(3).getPath().endsWith("3.log"));
Assert.assertEquals(records.get(3).getStatus(), IndexRecord.STATUS_WRITE_IN_PROGRESS);
Assert.assertEquals(records.get(2).getFailedAttempt(), 0);
Assert.assertEquals(records.get(1).getDoneCompleted(), 0);
Assert.assertEquals(records.get(0).getLine(), 0);
Assert.assertFalse(records.get(0).getLastSuccess() != 0);
}
private void verifyAdding(IndexManagement.IndexFileManager indexFileManager) throws IOException {
addFile(indexFileManager, spoolDirTest, "2.log");
addFile(indexFileManager, spoolDirTest, "3.log");
addFile(indexFileManager, spoolDirTest, "4.log");
addFile(indexFileManager, spoolDirTest, "5.log");
}
private void verifyArchiving(IndexManagement.IndexFileManager indexFileManager) {
indexFileManager.remove(indexFileManager.getRecords().get(1));
indexFileManager.remove(indexFileManager.getRecords().get(1));
indexFileManager.remove(indexFileManager.getRecords().get(1));
indexFileManager.remove(indexFileManager.getRecords().get(1));
checkArchiveDir(archiveDir);
}
private void verifyRemove(IndexManagement.IndexFileManager indexFileManager) throws IOException {
indexFileManager.remove(indexFileManager.getRecords().get(5));
boolean isPending = indexFileManager.getRecords().size() > 0;
Assert.assertTrue(isPending);
}
private void verifySaveAndLoad(IndexManagement.IndexFileManager indexFileManager) throws IOException {
indexFileManager.getRecords().get(2).updateFailedAttempt();
indexFileManager.getRecords().get(3).setDone();
indexFileManager.getRecords().get(1).setDoneCompleted(333l);
indexFileManager.getRecords().get(0).setCurrentLine(999);
Assert.assertEquals(indexFileManager.getRecords().size(), 6);
}
private void checkArchiveDir(File archiveDir) {
Set<String> availableFiles = new HashSet<>();
availableFiles.add(new File(archiveDir, "3.log").toString());
availableFiles.add(new File(archiveDir, "4.log").toString());
if (!archiveDir.exists()) {
return;
}
File[] files = archiveDir.listFiles();
Assert.assertNotNull(files);
Assert.assertEquals(files.length, 1);
}
private void addFile(IndexManagement.IndexFileManager indexFileManager, String dir, String fileName) throws IOException {
File file = new File(dir, fileName);
file.createNewFile();
indexFileManager.add(file.toString());
}
private void checkDoneFile(File newIndexDoneFile, File archiveDir, int maxArchiveFiles, String expectedFilePath) throws IOException {
IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexDoneFile, newIndexDoneFile, null, maxArchiveFiles);
Assert.assertEquals(indexFileManager.getRecords().size(), 2);
Assert.assertTrue(indexFileManager.getRecords().get(1).getPath().endsWith(expectedFilePath));
}
@AfterClass
public void tearDown() {
FileUtils.deleteQuietly(new File(spoolDirTest));
}
}
message-0
message-1
message-2
\ No newline at end of file
{"id":"1","path":"0.log","line":0,"created":123,"writeCompleted":888,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"PENDING"}
{"id":"2","path":"1.log","line":10,"created":456,"writeCompleted":0,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"WRITE_IN_PROGRESS"}
{"id":"x","path":"x.log","line":10,"created":456,"writeCompleted":0,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"DONE"}
...@@ -697,6 +697,7 @@ ...@@ -697,6 +697,7 @@
<javax.servlet.version>3.1.0</javax.servlet.version> <javax.servlet.version>3.1.0</javax.servlet.version>
<guava.version>25.1-jre</guava.version> <guava.version>25.1-jre</guava.version>
<antlr4.version>4.7</antlr4.version> <antlr4.version>4.7</antlr4.version>
<log4j.version>2.8</log4j.version>
<!-- Needed for hooks --> <!-- Needed for hooks -->
<aopalliance.version>1.0</aopalliance.version> <aopalliance.version>1.0</aopalliance.version>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment