Commit c6466004 by Ashutosh Mestry

ATLAS-2869: Hdfs_path if requested are created and then proceeds with export.

parent b08b7b49
......@@ -81,14 +81,16 @@ public class ExportService {
private final EntityGraphRetriever entityGraphRetriever;
private final AtlasGremlinQueryProvider gremlinQueryProvider;
private ExportTypeProcessor exportTypeProcessor;
private final HdfsPathEntityCreator hdfsPathEntityCreator;
@Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter) {
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
this.atlasGraph = atlasGraph;
this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.auditsWriter = auditsWriter;
this.auditsWriter = auditsWriter;
this.hdfsPathEntityCreator = hdfsPathEntityCreator;
}
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
......@@ -244,6 +246,10 @@ public class ExportService {
private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
List<String> ret = null;
if(item.getTypeName().equalsIgnoreCase(HdfsPathEntityCreator.HDFS_PATH_TYPE)) {
hdfsPathEntityCreator.getCreateEntity(item);
}
if (StringUtils.isNotEmpty(item.getGuid())) {
ret = Collections.singletonList(item.getGuid());
} else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import static org.apache.atlas.repository.impexp.AuditsWriter.getCurrentClusterName;
@Component
public class HdfsPathEntityCreator {
protected static final Logger LOG = LoggerFactory.getLogger(HdfsPathEntityCreator.class);
public static final String HDFS_PATH_TYPE = "hdfs_path";
public static final String HDFS_PATH_ATTRIBUTE_NAME_NAME = "name";
public static final String HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME = "clusterName";
public static final String HDFS_PATH_ATTRIBUTE_NAME_PATH = "path";
public static final String HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String QUALIFIED_NAME_FORMAT = "%s@%s";
private final String PATH_SEPARATOR = "/";
private AtlasTypeRegistry typeRegistry;
private AtlasEntityStoreV1 entityStore;
@Inject
public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, AtlasEntityStoreV1 entityStore) {
this.typeRegistry = typeRegistry;
this.entityStore = entityStore;
}
public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(AtlasObjectId item) throws AtlasBaseException {
if(!item.getUniqueAttributes().containsKey(HDFS_PATH_ATTRIBUTE_NAME_PATH)) {
return null;
}
return getCreateEntity((String) item.getUniqueAttributes().get(HDFS_PATH_ATTRIBUTE_NAME_PATH));
}
public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path) throws AtlasBaseException {
return getCreateEntity(path, getCurrentClusterName());
}
public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path, String clusterName) throws AtlasBaseException {
String pathWithTrailingSeparator = getPathWithTrailingSeparator(path);
AtlasEntityType hdfsPathEntityType = getHdfsPathEntityType();
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
if(entityWithExtInfo != null) {
return entityWithExtInfo;
}
AtlasEntity entity = createHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
AtlasEntityStream entityStream = new AtlasEntityStream(entity);
EntityMutationResponse entityMutationResponse = entityStore.createOrUpdate(entityStream, false);
if(entityMutationResponse.getCreatedEntities().size() == 0) {
return null;
}
return getHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName);
}
private AtlasEntity createHDFSPathEntity(AtlasEntityType hdfsPathEntityType, String path, String clusterName) {
AtlasEntity entity = hdfsPathEntityType.createDefaultValue();
entity.setAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(path, clusterName));
entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_PATH, path);
entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME, path);
entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME, clusterName);
return entity;
}
private AtlasEntity.AtlasEntityWithExtInfo getHDFSPathEntity(AtlasEntityType hdfsPathEntityType, String path, String clusterName) {
try {
return entityStore.getByUniqueAttributes(hdfsPathEntityType, getUniqueAttributes(path, clusterName));
} catch (AtlasBaseException e) {
return null;
}
}
private AtlasEntityType getHdfsPathEntityType() throws AtlasBaseException {
return (AtlasEntityType) typeRegistry.getType(HDFS_PATH_TYPE);
}
private Map<String,Object> getUniqueAttributes(String path, String clusterName) {
Map<String,Object> ret = new HashMap<String, Object>();
ret.put(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(path, clusterName));
return ret;
}
private String getPathWithTrailingSeparator(String path) {
if(path.endsWith(PATH_SEPARATOR)) {
return path;
}
return path + PATH_SEPARATOR;
}
public static String getQualifiedName(String path, String clusterName) {
return String.format(QUALIFIED_NAME_FORMAT, path, clusterName);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME;
import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_NAME;
import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.getQualifiedName;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@Guice(modules = TestModules.TestOnlyModule.class)
public class HdfsPathEntityCreatorTest extends ExportImportTestBase {
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private HdfsPathEntityCreator hdfsPathEntityCreator;
private static final String expectedPath = "hdfs://server-name/warehouse/hr";
private static final String expectedClusterName = "cl1";
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry);
loadFsModel(typeDefStore, typeRegistry);
}
@Test
public void verifyCreate() throws AtlasBaseException {
String expectedQualifiedName = getQualifiedName(expectedPath + "/", expectedClusterName);
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName);
assertNotNull(entityWithExtInfo);
AtlasEntity entity = entityWithExtInfo.getEntity();
assertEquals(entity.getAttribute(HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_PATH), expectedPath + "/");
assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME),expectedQualifiedName);
assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME), expectedPath + "/");
assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME), expectedClusterName);
}
@Test(dependsOnMethods = "verifyCreate")
public void verifyGet() throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName);
assertNotNull(entityWithExtInfo);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment