Commit 3e035c45 by Nikhil Bonte Committed by Ashutosh Mestry

ATLAS-3595, ATLAS-3603: Auto-start migration import for ZIP file-based migration.

parent 015577ec
...@@ -40,10 +40,12 @@ import static org.apache.atlas.AtlasConstants.ATLAS_SERVICES_ENABLED; ...@@ -40,10 +40,12 @@ import static org.apache.atlas.AtlasConstants.ATLAS_SERVICES_ENABLED;
public class Services { public class Services {
public static final Logger LOG = LoggerFactory.getLogger(Services.class); public static final Logger LOG = LoggerFactory.getLogger(Services.class);
private static final String DATA_MIGRATION_CLASS_NAME_DEFAULT = "DataMigrationService"; private static final String DATA_MIGRATION_CLASS_NAME_DEFAULT = "DataMigrationService";
private static final String FILE_EXTENSION_ZIP = ".zip";
private final List<Service> services; private final List<Service> services;
private final String dataMigrationClassName; private final String dataMigrationClassName;
private final boolean servicesEnabled; private final boolean servicesEnabled;
private final String migrationDirName;
private final boolean migrationEnabled; private final boolean migrationEnabled;
@Inject @Inject
...@@ -51,7 +53,8 @@ public class Services { ...@@ -51,7 +53,8 @@ public class Services {
this.services = services; this.services = services;
this.dataMigrationClassName = configuration.getString("atlas.migration.class.name", DATA_MIGRATION_CLASS_NAME_DEFAULT); this.dataMigrationClassName = configuration.getString("atlas.migration.class.name", DATA_MIGRATION_CLASS_NAME_DEFAULT);
this.servicesEnabled = configuration.getBoolean(ATLAS_SERVICES_ENABLED, true); this.servicesEnabled = configuration.getBoolean(ATLAS_SERVICES_ENABLED, true);
this.migrationEnabled = StringUtils.isNotEmpty(configuration.getString(ATLAS_MIGRATION_MODE_FILENAME)); this.migrationDirName = configuration.getString(ATLAS_MIGRATION_MODE_FILENAME);
this.migrationEnabled = StringUtils.isNotEmpty(migrationDirName);
} }
@PostConstruct @PostConstruct
...@@ -92,11 +95,22 @@ public class Services { ...@@ -92,11 +95,22 @@ public class Services {
private boolean isServiceUsed(Service service) { private boolean isServiceUsed(Service service) {
if (isDataMigrationService(service)) { if (isDataMigrationService(service)) {
return migrationEnabled; return migrationEnabled;
} else if (isZipFileMigration()) {
return isNeededForZipFileMigration(service);
} else { } else {
return !migrationEnabled && servicesEnabled; return !migrationEnabled && servicesEnabled;
} }
} }
private boolean isZipFileMigration() {
return migrationEnabled && StringUtils.endsWithIgnoreCase(migrationDirName, FILE_EXTENSION_ZIP);
}
private boolean isNeededForZipFileMigration(Service svc) {
return svc.getClass().getSuperclass().getSimpleName().equals("AbstractStorageBasedAuditRepository") ||
svc.getClass().getSuperclass().getSimpleName().equals("AbstractNotification");
}
private boolean isDataMigrationService(Service svc) { private boolean isDataMigrationService(Service svc) {
return svc.getClass().getSimpleName().equals(dataMigrationClassName); return svc.getClass().getSimpleName().equals(dataMigrationClassName);
} }
......
...@@ -18,20 +18,21 @@ ...@@ -18,20 +18,21 @@
package org.apache.atlas.repository.migration; package org.apache.atlas.repository.migration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.GraphDBMigrator; import org.apache.atlas.repository.graphdb.GraphDBMigrator;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ImportTypeDefProcessor; import org.apache.atlas.repository.impexp.ImportTypeDefProcessor;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.service.Service;
import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.service.Service;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.solr.common.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -48,6 +49,8 @@ import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME; ...@@ -48,6 +49,8 @@ import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
public class DataMigrationService implements Service { public class DataMigrationService implements Service {
private static final Logger LOG = LoggerFactory.getLogger(DataMigrationService.class); private static final Logger LOG = LoggerFactory.getLogger(DataMigrationService.class);
private static final String FILE_EXTENSION_ZIP = ".zip";
private static String ATLAS_MIGRATION_DATA_NAME = "atlas-migration-data.json"; private static String ATLAS_MIGRATION_DATA_NAME = "atlas-migration-data.json";
private static String ATLAS_MIGRATION_TYPESDEF_NAME = "atlas-migration-typesdef.json"; private static String ATLAS_MIGRATION_TYPESDEF_NAME = "atlas-migration-typesdef.json";
...@@ -57,9 +60,15 @@ public class DataMigrationService implements Service { ...@@ -57,9 +60,15 @@ public class DataMigrationService implements Service {
@Inject @Inject
public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore typeDefStore, Configuration configuration, public DataMigrationService(GraphDBMigrator migrator, AtlasTypeDefStore typeDefStore, Configuration configuration,
GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer, GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer,
AtlasTypeRegistry typeRegistry) { AtlasTypeRegistry typeRegistry, ImportService importService) {
this.configuration = configuration; this.configuration = configuration;
this.thread = new Thread(new FileImporter(migrator, typeDefStore, typeRegistry, storeInitializer, getFileName(), indexer));
String fileName = getFileName();
boolean zipFileBasedMigrationImport = StringUtils.endsWithIgnoreCase(fileName, FILE_EXTENSION_ZIP);
this.thread = (zipFileBasedMigrationImport)
? new Thread(new ZipFileMigrationImporter(importService, fileName), "zipFileBasedMigrationImporter")
: new Thread(new FileImporter(migrator, typeDefStore, typeRegistry, storeInitializer, fileName, indexer));
} }
@Override @Override
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
public class FileWatcher {
private static final Logger LOG = LoggerFactory.getLogger(FileWatcher.class);
private final static int MAX_TIMES_PAUSE = 10;
private final static int PAUSE_INTERVAL = 5000; // 5 secs
private int checkIncrement;
private final File fileToWatch;
public FileWatcher(String filePath) {
this.checkIncrement = 1;
this.fileToWatch = new File(filePath);
}
public void start() throws IOException {
if (existsAndReadyCheck()) {
return;
}
WatchService watcher = FileSystems.getDefault().newWatchService();
Path pathToWatch = FileSystems.getDefault().getPath(fileToWatch.getParent());
register(watcher, pathToWatch);
try {
LOG.info(String.format("Migration File Watcher: Watching: %s", fileToWatch.toString()));
startWatching(watcher);
} catch (InterruptedException ex) {
LOG.error("Migration File Watcher: Interrupted!");
} finally {
watcher.close();
}
}
private void startWatching(WatchService watcher) throws InterruptedException {
while (true) {
WatchKey watchKey = watcher.take();
if (watchKey == null) {
continue;
}
for (WatchEvent event : watchKey.pollEvents()) {
if (checkIfFileAvailableAndReady(event)) {
return;
}
}
watchKey.reset();
}
}
private void register(WatchService watcher, Path path) throws IOException {
try {
path.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY);
} catch (IOException e) {
LOG.error("Migration File Watcher: Error while registering event {}!", path);
throw e;
}
}
private boolean checkIfFileAvailableAndReady(WatchEvent event) {
WatchEvent<Path> watchEvent = event;
Path path = watchEvent.context();
if (!path.toString().equals(fileToWatch.getName())) {
return false;
}
return existsAndReadyCheck();
}
private boolean existsAndReadyCheck() {
boolean ret = fileToWatch.exists() && fileToWatch.canRead();
if (ret) {
try {
return isReadyForUse(fileToWatch);
} catch (InterruptedException e) {
LOG.error("Migration File Watcher: Interrupted {}!", fileToWatch);
return false;
}
} else {
LOG.info(String.format("Migration File Watcher: File does not exist!: %s", fileToWatch.getAbsolutePath()));
}
return ret;
}
private boolean isReadyForUse(File file) throws InterruptedException {
Long fileSizeBefore = file.length();
Thread.sleep(getCheckInterval());
Long fileSizeAfter = file.length();
boolean ret = fileSizeBefore.equals(fileSizeAfter);
if (ret) {
LOG.info(String.format("Migration File Watcher: %s: File is ready for use!", file.getAbsolutePath()));
} else {
incrementCheckCounter();
LOG.info(
String.format("Migration File Watcher: File is being written: Pause: %,d secs: New size: %,d."
, getCheckInterval() / 1000
, fileSizeAfter));
}
return ret;
}
private int getCheckInterval() {
return (PAUSE_INTERVAL * (checkIncrement));
}
private int incrementCheckCounter() {
if (checkIncrement > MAX_TIMES_PAUSE) {
checkIncrement = 1;
}
return (PAUSE_INTERVAL * (checkIncrement++));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.repository.impexp.ImportService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
public class ZipFileMigrationImporter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
private static String ENV_USER_NAME = "user.name";
private final ImportService importService;
private final String fileToImport;
public ZipFileMigrationImporter(ImportService importService, String fileName) {
this.importService = importService;
this.fileToImport = fileName;
}
@Override
public void run() {
try {
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
performImport(new FileInputStream(new File(fileToImport)));
} catch (IOException e) {
LOG.error("Migration Import: IO Error!", e);
} catch (AtlasBaseException e) {
LOG.error("Migration Import: Error!", e);
}
}
private void performImport(InputStream fs) throws AtlasBaseException {
try {
LOG.info("Migration Import: {}: Starting...", fileToImport);
RequestContext.get().setUser(getUserNameFromEnvironment(), null);
importService.run(fs, getImportRequest(),
getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress());
} catch (Exception ex) {
LOG.error("Error loading zip for migration", ex);
throw new AtlasBaseException(ex);
} finally {
LOG.info("Migration Import: {}: Done!", fileToImport);
}
}
private String getUserNameFromEnvironment() {
return System.getProperty(ENV_USER_NAME);
}
private AtlasImportRequest getImportRequest() throws AtlasException {
return new AtlasImportRequest();
}
private String getPropertyValue(String property, String defaultValue) throws AtlasException {
return ApplicationProperties.get().getString(property, defaultValue);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment