Commit cec7aa85 by Nikhil Bonte Committed by Sarath Subramanian

ATLAS-3885: import-hive.sh: Hive entities with Ozone location created by…

ATLAS-3885: import-hive.sh: Hive entities with Ozone location created by import-hive.sh creates hdfs_path entity instead of ozone in Atlas Signed-off-by: 's avatarSarath Subramanian <sarath@apache.org>
parent d9d0e8c2
...@@ -20,6 +20,7 @@ package org.apache.atlas.hive.bridge; ...@@ -20,6 +20,7 @@ package org.apache.atlas.hive.bridge;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasClientV2;
...@@ -30,6 +31,7 @@ import org.apache.atlas.hook.AtlasHookException; ...@@ -30,6 +31,7 @@ import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations; import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver; import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
...@@ -37,6 +39,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; ...@@ -37,6 +39,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.utils.PathExtractorContext;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
...@@ -87,6 +90,7 @@ public class HiveMetaStoreBridge { ...@@ -87,6 +90,7 @@ public class HiveMetaStoreBridge {
public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String HIVE_METADATA_NAMESPACE = "atlas.metadata.namespace"; public static final String HIVE_METADATA_NAMESPACE = "atlas.metadata.namespace";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION = CONF_PREFIX + "aws_s3.atlas.model.version";
public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String TEMP_TABLE_PREFIX = "_temp-"; public static final String TEMP_TABLE_PREFIX = "_temp-";
public static final String ATLAS_ENDPOINT = "atlas.rest.address"; public static final String ATLAS_ENDPOINT = "atlas.rest.address";
...@@ -94,6 +98,8 @@ public class HiveMetaStoreBridge { ...@@ -94,6 +98,8 @@ public class HiveMetaStoreBridge {
public static final String HDFS_PATH = "hdfs_path"; public static final String HDFS_PATH = "hdfs_path";
public static final String DEFAULT_METASTORE_CATALOG = "hive"; public static final String DEFAULT_METASTORE_CATALOG = "hive";
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
private static final int EXIT_CODE_SUCCESS = 0; private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_FAILED = 1; private static final int EXIT_CODE_FAILED = 1;
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
...@@ -103,6 +109,8 @@ public class HiveMetaStoreBridge { ...@@ -103,6 +109,8 @@ public class HiveMetaStoreBridge {
private final AtlasClientV2 atlasClientV2; private final AtlasClientV2 atlasClientV2;
private final boolean convertHdfsPathToLowerCase; private final boolean convertHdfsPathToLowerCase;
private String awsS3AtlasModelVersion = null;
public static void main(String[] args) { public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED; int exitCode = EXIT_CODE_FAILED;
...@@ -216,6 +224,7 @@ public class HiveMetaStoreBridge { ...@@ -216,6 +224,7 @@ public class HiveMetaStoreBridge {
this.hiveClient = Hive.get(hiveConf); this.hiveClient = Hive.get(hiveConf);
this.atlasClientV2 = atlasClientV2; this.atlasClientV2 = atlasClientV2;
this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false); this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
this.awsS3AtlasModelVersion = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
} }
/** /**
...@@ -355,12 +364,17 @@ public class HiveMetaStoreBridge { ...@@ -355,12 +364,17 @@ public class HiveMetaStoreBridge {
AtlasEntityWithExtInfo processEntity = findProcessEntity(processQualifiedName); AtlasEntityWithExtInfo processEntity = findProcessEntity(processQualifiedName);
if (processEntity == null) { if (processEntity == null) {
String tableLocation = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString(); String tableLocationString = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString();
String query = getCreateTableString(table, tableLocation); Path location = table.getDataLocation();
AtlasEntity pathInst = toHdfsPathEntity(tableLocation); String query = getCreateTableString(table, tableLocationString);
AtlasEntity tableInst = tableEntity.getEntity();
AtlasEntity processInst = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName()); PathExtractorContext pathExtractorCtx = new PathExtractorContext(getMetadataNamespace(), isConvertHdfsPathToLowerCase(), awsS3AtlasModelVersion);
long now = System.currentTimeMillis(); AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(location, pathExtractorCtx);
AtlasEntity pathInst = entityWithExtInfo.getEntity();
AtlasEntity tableInst = tableEntity.getEntity();
AtlasEntity processInst = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName());
long now = System.currentTimeMillis();
processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName); processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName);
processInst.setAttribute(ATTRIBUTE_NAME, query); processInst.setAttribute(ATTRIBUTE_NAME, query);
...@@ -379,7 +393,12 @@ public class HiveMetaStoreBridge { ...@@ -379,7 +393,12 @@ public class HiveMetaStoreBridge {
AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo(); AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo();
createTableProcess.addEntity(processInst); createTableProcess.addEntity(processInst);
createTableProcess.addEntity(pathInst);
if (pathExtractorCtx.getKnownEntities() != null) {
pathExtractorCtx.getKnownEntities().values().forEach(entity -> createTableProcess.addEntity(entity));
} else {
createTableProcess.addEntity(pathInst);
}
registerInstances(createTableProcess); registerInstances(createTableProcess);
} else { } else {
...@@ -725,35 +744,6 @@ public class HiveMetaStoreBridge { ...@@ -725,35 +744,6 @@ public class HiveMetaStoreBridge {
return ret; return ret;
} }
private AtlasEntity toHdfsPathEntity(String pathUri) {
AtlasEntity ret = new AtlasEntity(HDFS_PATH);
String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(pathUri);
Path path = new Path(pathUri);
ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString());
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
if (StringUtils.isNotEmpty(nameServiceID)) {
// Name service resolution is successful, now get updated HDFS path where the host port info is replaced by resolved name service
String updatedHdfsPath = HdfsNameServiceResolver.getPathWithNameServiceID(pathUri);
ret.setAttribute(ATTRIBUTE_PATH, updatedHdfsPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(updatedHdfsPath));
ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
} else {
ret.setAttribute(ATTRIBUTE_PATH, pathUri);
// Only append metadataNamespace for the HDFS path
if (pathUri.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(pathUri));
} else {
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathUri);
}
}
return ret;
}
/** /**
* Gets the atlas entity for the database * Gets the atlas entity for the database
* @param databaseName database Name * @param databaseName database Name
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment