Commit e8661ecb by Ashutosh Mestry

ATLAS-3641: Import Service: Support zipDirect format of import.

parent 30a09955
......@@ -44,10 +44,13 @@ public class AtlasImportRequest implements Serializable {
public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
public static final String OPTION_KEY_FORMAT = "format";
public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
private static final String START_POSITION_KEY = "startPosition";
private static final String START_GUID_KEY = "startGuid";
private static final String FILE_NAME_KEY = "fileName";
private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
private static final String OPTION_KEY_STREAM_SIZE = "size";
private Map<String, String> options;
......@@ -108,7 +111,7 @@ public class AtlasImportRequest implements Serializable {
return null;
}
return (String) this.options.get(key);
return this.options.get(key);
}
@JsonIgnore
......@@ -127,4 +130,17 @@ public class AtlasImportRequest implements Serializable {
options = new HashMap<>();
}
options.put(key, value);
}}
}
public void setSizeOption(int size) {
setOption(OPTION_KEY_STREAM_SIZE, Integer.toString(size));
}
public int getSizeOption() {
if (!this.options.containsKey(OPTION_KEY_STREAM_SIZE)) {
return 1;
}
return Integer.valueOf(this.options.get(OPTION_KEY_STREAM_SIZE));
}
}
......@@ -251,6 +251,10 @@ public class AtlasJson {
return ret;
}
public static ObjectCodec getMapper() {
return mapper;
}
static class DateSerializer extends JsonSerializer<Date> {
@Override
public void serialize(Date value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
......
......@@ -92,7 +92,7 @@ public class ImportService {
request = new AtlasImportRequest();
}
EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
EntityImportStream source = createZipSource(request, inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
return run(source, request, userName, hostName, requestingIP);
}
......@@ -248,8 +248,13 @@ public class ImportService {
return (int) (endTime - startTime);
}
private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
try {
if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) {
return getZipDirectEntityImportStream(request, inputStream);
}
if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
return new ZipSource(inputStream);
}
......@@ -260,9 +265,15 @@ public class ImportService {
}
}
private EntityImportStream getZipDirectEntityImportStream(AtlasImportRequest request, InputStream inputStream) throws IOException, AtlasBaseException {
ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, request.getSizeOption());
LOG.info("Using ZipSourceDirect: Size: {} entities", zipSourceDirect.size());
return zipSourceDirect;
}
@VisibleForTesting
boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) {
if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
if (exportRequest == null || CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
return false;
}
......
......@@ -31,4 +31,8 @@ public enum ZipExportFileNames {
public String toString() {
return this.name;
}
public String toEntryFileName() {
return this.name + ".json";
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasJson;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
public class ZipSourceDirect implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSourceDirect.class);
private static final String ZIP_ENTRY_ENTITIES = "entities.json";
private final ZipInputStream zipInputStream;
private int currentPosition;
private ImportTransforms importTransform;
private List<BaseEntityHandler> entityHandlers;
private AtlasTypesDef typesDef;
private int streamSize = 1;
EntitiesArrayParser entitiesArrayParser;
public ZipSourceDirect(InputStream inputStream, int streamSize) throws IOException, AtlasBaseException {
this.zipInputStream = new ZipInputStream(inputStream);
this.streamSize = streamSize;
prepareStreamForFetch();
}
@Override
public ImportTransforms getImportTransform() {
return this.importTransform;
}
@Override
public void setImportTransform(ImportTransforms importTransform) {
this.importTransform = importTransform;
}
@Override
public List<BaseEntityHandler> getEntityHandlers() {
return entityHandlers;
}
@Override
public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
this.entityHandlers = entityHandlers;
}
@Override
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
return this.typesDef;
}
@Override
public AtlasExportResult getExportResult() throws AtlasBaseException {
return new AtlasExportResult();
}
@Override
public List<String> getCreationOrder() {
return new ArrayList<>();
}
@Override
public int getPosition() {
return currentPosition;
}
@Override
public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String json) throws AtlasBaseException {
if (StringUtils.isEmpty(json)) {
return null;
}
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
if (importTransform != null) {
entityWithExtInfo = importTransform.apply(entityWithExtInfo);
}
if (entityHandlers != null) {
applyTransformers(entityWithExtInfo);
}
return entityWithExtInfo;
}
@Override
public boolean hasNext() {
return (this.entitiesArrayParser != null && entitiesArrayParser.hasNext());
}
@Override
public AtlasEntity next() {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
}
@Override
public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
try {
if (hasNext()) {
String json = moveNext();
return getEntityWithExtInfo(json);
}
} catch (AtlasBaseException e) {
LOG.error("getNextEntityWithExtInfo", e);
}
return null;
}
@Override
public void reset() {
currentPosition = 0;
}
@Override
public AtlasEntity getByGuid(String guid) {
try {
return getEntity(guid);
} catch (AtlasBaseException e) {
LOG.error("getByGuid: {} failed!", guid, e);
return null;
}
}
@Override
public void onImportComplete(String guid) {
}
@Override
public void setPosition(int index) {
try {
for (int i = 0; i < index; i++) {
moveNextEntry();
}
} catch (IOException e) {
LOG.error("Error setting position: {}. Position may be beyond the stream size.", index);
}
}
@Override
public void setPositionUsingEntityGuid(String guid) {
}
@Override
public void close() {
if (this.entitiesArrayParser != null) {
this.entitiesArrayParser.close();
}
}
private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
if (entityWithExtInfo == null) {
return;
}
transform(entityWithExtInfo.getEntity());
if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
transform(e);
}
}
}
private void transform(AtlasEntity e) {
for (BaseEntityHandler handler : entityHandlers) {
handler.transform(e);
}
}
private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
try {
return AtlasType.fromJson(jsonData, clazz);
} catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e);
}
}
private AtlasEntity getEntity(String guid) throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid);
return (extInfo != null) ? extInfo.getEntity() : null;
}
public int size() {
return this.streamSize;
}
private String moveNext() {
try {
moveNextEntry();
return entitiesArrayParser.next();
} catch (IOException e) {
LOG.error("moveNext failed!", e);
}
return null;
}
private void moveNextEntry() throws IOException {
this.currentPosition++;
}
private void prepareStreamForFetch() throws AtlasBaseException, IOException {
ZipEntry zipEntryNext = zipInputStream.getNextEntry();
if (zipEntryNext == null) {
throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP.");
}
if (zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) {
String json = getJsonPayloadFromZipEntryStream(this.zipInputStream);
this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class);
}
zipEntryNext = zipInputStream.getNextEntry();
if (zipEntryNext.getName().equals(ZIP_ENTRY_ENTITIES)) {
this.entitiesArrayParser = new EntitiesArrayParser(zipInputStream);
} else {
throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP. " + ZIP_ENTRY_ENTITIES + " could not be found!");
}
}
private String getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
IOUtils.copy(zipInputStream, bos);
} catch (IOException e) {
LOG.error("Streaming copying failed!", e);
return null;
}
return bos.toString();
}
static class EntitiesArrayParser {
private static final String EMPTY_OBJECT = "{}";
private final JsonFactory factory;
private final JsonParser parser;
private boolean hasNext;
public EntitiesArrayParser(InputStream inputStream) throws IOException {
this.factory = AtlasJson.getMapper().getFactory();
this.parser = factory.createParser(inputStream);
parseNext();
}
public String next() throws IOException {
JsonToken jsonToken = parseNext();
if (!hasNext) {
return null;
}
if (jsonToken != null && jsonToken == JsonToken.START_OBJECT) {
JsonNode node = parser.readValueAsTree();
return validate(node.toString());
}
return null;
}
private JsonToken parseNext() throws IOException {
JsonToken jsonToken = this.parser.nextToken();
hasNext = (jsonToken != null) && (jsonToken != JsonToken.END_ARRAY);
return jsonToken;
}
private String validate(String payload) {
if (payload.equals(EMPTY_OBJECT)) {
hasNext = false;
close();
return null;
}
return payload;
}
public boolean hasNext() {
return hasNext;
}
public void close() {
try {
this.parser.close();
} catch (IOException e) {
LOG.error("Error closing parser!", e);
}
}
}
}
......@@ -84,10 +84,8 @@ public class ZipFileMigrationImporter implements Runnable {
}
private AtlasImportRequest getImportRequest() throws AtlasException {
return new AtlasImportRequest();
}
private String getPropertyValue(String property, String defaultValue) throws AtlasException {
return ApplicationProperties.get().getString(property, defaultValue);
AtlasImportRequest request = new AtlasImportRequest();
request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
return request;
}
}
......@@ -186,6 +186,11 @@ public class ImportServiceTest extends ExportImportTestBase {
return getZipSource("salesNewTypeAttrs-next.zip");
}
@DataProvider(name = "zip-direct-3")
public static Object[][] getZipDirect3(ITestContext context) throws IOException, AtlasBaseException {
return getZipSource("zip-direct-3.zip");
}
@Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = "importDB4")
public void importDB5(InputStream inputStream) throws AtlasBaseException, IOException {
final String newEnumDefName = "database_action";
......@@ -346,6 +351,16 @@ public class ImportServiceTest extends ExportImportTestBase {
}
}
@Test(dataProvider = "zip-direct-3", expectedExceptions = AtlasBaseException.class)
public void zipDirectSample(InputStream inputStream) throws IOException, AtlasBaseException {
loadBaseModel();
loadFsModel();
AtlasImportRequest request = new AtlasImportRequest();
request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
runImportWithParameters(importService, request, inputStream);
}
@DataProvider(name = "relationshipLineage")
public static Object[][] getImportWithRelationships(ITestContext context) throws IOException, AtlasBaseException {
return getZipSource("rel-lineage.zip");
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
public class ZipDirectTest {
@Test(expectedExceptions = AtlasBaseException.class)
public void loadFileEmpty() throws IOException, AtlasBaseException {
InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-1.zip");
new ZipSourceDirect(inputStream, 1);
}
@Test
public void loadFile() throws IOException, AtlasBaseException {
final int EXPECTED_ENTITY_COUNT = 3;
InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, EXPECTED_ENTITY_COUNT);
assertNotNull(zipSourceDirect);
assertNotNull(zipSourceDirect.getTypesDef());
assertTrue(zipSourceDirect.getTypesDef().getEntityDefs().size() > 0);
assertNotNull(zipSourceDirect.getExportResult());
int count = 0;
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
while((entityWithExtInfo = zipSourceDirect.getNextEntityWithExtInfo()) != null) {
assertNotNull(entityWithExtInfo);
count++;
}
assertEquals(count, EXPECTED_ENTITY_COUNT);
}
@Test
public void entitiesParserTest() throws IOException {
String object1 = "{\"type\":\"hdfs_path\"}";
String object2 = "{\"type\":\"hive_db\"}";
String entities = "[" + object1 + "," + object2 + ",{}]";
InputStream inputStream = new ByteArrayInputStream(entities.getBytes());
ZipSourceDirect.EntitiesArrayParser entitiesArrayParser = new ZipSourceDirect.EntitiesArrayParser(inputStream);
Object o = entitiesArrayParser.next();
assertNotNull(o);
assertEquals(o, object1);
o = entitiesArrayParser.next();
assertEquals(o, object2);
o = entitiesArrayParser.next();
assertNull(o);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment