Commit 8c4a7fae by Shwetha GS

ATLAS-415 Hive import fails when importing a table that is already imported…

ATLAS-415 Hive import fails when importing a table that is already imported without StorageDescriptor information (yhemanth via shwethags)
parent 7b07a222
...@@ -127,6 +127,11 @@ ...@@ -127,6 +127,11 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId> <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId> <artifactId>jetty-server</artifactId>
<scope>test</scope> <scope>test</scope>
......
...@@ -52,12 +52,16 @@ import java.util.List; ...@@ -52,12 +52,16 @@ import java.util.List;
/** /**
* A Bridge Utility that imports metadata from the Hive Meta Store * A Bridge Utility that imports metadata from the Hive Meta Store
* and registers then in Atlas. * and registers them in Atlas.
*/ */
public class HiveMetaStoreBridge { public class HiveMetaStoreBridge {
private static final String DEFAULT_DGI_URL = "http://localhost:21000/"; private static final String DEFAULT_DGI_URL = "http://localhost:21000/";
public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name"; public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String DESCRIPTION_ATTR = "description";
public static final String TABLE_TYPE_ATTR = "tableType";
public static final String SEARCH_ENTRY_GUID_ATTR = "__guid";
public static final String LAST_ACCESS_TIME_ATTR = "lastAccessTime";
private final String clusterName; private final String clusterName;
public static final String ATLAS_ENDPOINT = "atlas.rest.address"; public static final String ATLAS_ENDPOINT = "atlas.rest.address";
...@@ -67,6 +71,12 @@ public class HiveMetaStoreBridge { ...@@ -67,6 +71,12 @@ public class HiveMetaStoreBridge {
public final Hive hiveClient; public final Hive hiveClient;
private final AtlasClient atlasClient; private final AtlasClient atlasClient;
/**
* Construct a HiveMetaStoreBridge.
* @param hiveConf {@link HiveConf} for Hive component in the cluster
* @param atlasConf {@link Configuration} for Atlas component in the cluster
* @throws Exception
*/
public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf) throws Exception { public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf) throws Exception {
this(hiveConf, atlasConf, null, null); this(hiveConf, atlasConf, null, null);
} }
...@@ -77,21 +87,28 @@ public class HiveMetaStoreBridge { ...@@ -77,21 +87,28 @@ public class HiveMetaStoreBridge {
/** /**
* Construct a HiveMetaStoreBridge. * Construct a HiveMetaStoreBridge.
* @param hiveConf hive conf * @param hiveConf {@link HiveConf} for Hive component in the cluster
* @param doAsUser The user accessing Atlas service
* @param ugi {@link UserGroupInformation} representing the Atlas service
*/ */
public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf, String doAsUser, public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf, String doAsUser,
UserGroupInformation ugi) throws Exception { UserGroupInformation ugi) throws Exception {
clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME),
hiveClient = Hive.get(hiveConf); Hive.get(hiveConf),
new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser));
}
atlasClient = new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser); HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
this.clusterName = clusterName;
this.hiveClient = hiveClient;
this.atlasClient = atlasClient;
} }
public AtlasClient getAtlasClient() { private AtlasClient getAtlasClient() {
return atlasClient; return atlasClient;
} }
public void importHiveMetadata() throws Exception { void importHiveMetadata() throws Exception {
LOG.info("Importing hive metadata"); LOG.info("Importing hive metadata");
importDatabases(); importDatabases();
} }
...@@ -106,27 +123,13 @@ public class HiveMetaStoreBridge { ...@@ -106,27 +123,13 @@ public class HiveMetaStoreBridge {
} }
/** /**
* Creates db entity * Create a Hive Database entity
* @param hiveDB * @param hiveDB The Hive {@link Database} object from which to map properties
* @return * @return new Hive Database entity
* @throws HiveException * @throws HiveException
*/ */
public Referenceable createDBInstance(Database hiveDB) throws HiveException { public Referenceable createDBInstance(Database hiveDB) throws HiveException {
LOG.info("Importing objects from databaseName : " + hiveDB.getName()); return createOrUpdateDBInstance(hiveDB, null);
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
String dbName = hiveDB.getName().toLowerCase();
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName));
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName);
dbRef.set("description", hiveDB.getDescription());
dbRef.set("locationUri", hiveDB.getLocationUri());
dbRef.set("parameters", hiveDB.getParameters());
dbRef.set("ownerName", hiveDB.getOwnerName());
if (hiveDB.getOwnerType() != null) {
dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
}
return dbRef;
} }
/** /**
...@@ -137,12 +140,34 @@ public class HiveMetaStoreBridge { ...@@ -137,12 +140,34 @@ public class HiveMetaStoreBridge {
*/ */
private Referenceable registerDatabase(String databaseName) throws Exception { private Referenceable registerDatabase(String databaseName) throws Exception {
Referenceable dbRef = getDatabaseReference(clusterName, databaseName); Referenceable dbRef = getDatabaseReference(clusterName, databaseName);
Database db = hiveClient.getDatabase(databaseName);
if (dbRef == null) { if (dbRef == null) {
Database db = hiveClient.getDatabase(databaseName);
dbRef = createDBInstance(db); dbRef = createDBInstance(db);
dbRef = registerInstance(dbRef); dbRef = registerInstance(dbRef);
} else { } else {
LOG.info("Database {} is already registered with id {}", databaseName, dbRef.getId().id); LOG.info("Database {} is already registered with id {}. Updating it.", databaseName, dbRef.getId().id);
dbRef = createOrUpdateDBInstance(db, dbRef);
updateInstance(dbRef);
}
return dbRef;
}
private Referenceable createOrUpdateDBInstance(Database hiveDB, Referenceable dbRef) {
LOG.info("Importing objects from databaseName : " + hiveDB.getName());
if (dbRef == null) {
dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
}
String dbName = hiveDB.getName().toLowerCase();
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName));
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName);
dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription());
dbRef.set("locationUri", hiveDB.getLocationUri());
dbRef.set("parameters", hiveDB.getParameters());
dbRef.set("ownerName", hiveDB.getOwnerName());
if (hiveDB.getOwnerType() != null) {
dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
} }
return dbRef; return dbRef;
} }
...@@ -153,7 +178,7 @@ public class HiveMetaStoreBridge { ...@@ -153,7 +178,7 @@ public class HiveMetaStoreBridge {
* @return * @return
* @throws Exception * @throws Exception
*/ */
public Referenceable registerInstance(Referenceable referenceable) throws Exception { private Referenceable registerInstance(Referenceable referenceable) throws Exception {
String typeName = referenceable.getTypeName(); String typeName = referenceable.getTypeName();
LOG.debug("creating instance of type " + typeName); LOG.debug("creating instance of type " + typeName);
...@@ -176,11 +201,15 @@ public class HiveMetaStoreBridge { ...@@ -176,11 +201,15 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for database {}", databaseName); LOG.debug("Getting reference for database {}", databaseName);
String typeName = HiveDataTypes.HIVE_DB.getName(); String typeName = HiveDataTypes.HIVE_DB.getName();
String dslQuery = String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME, String dslQuery = getDatabaseDSLQuery(clusterName, databaseName, typeName);
databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, clusterName);
return getEntityReferenceFromDSL(typeName, dslQuery); return getEntityReferenceFromDSL(typeName, dslQuery);
} }
static String getDatabaseDSLQuery(String clusterName, String databaseName, String typeName) {
return String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME,
databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, clusterName);
}
private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception { private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception {
AtlasClient dgiClient = getAtlasClient(); AtlasClient dgiClient = getAtlasClient();
JSONArray results = dgiClient.searchByDSL(dslQuery); JSONArray results = dgiClient.searchByDSL(dslQuery);
...@@ -198,6 +227,12 @@ public class HiveMetaStoreBridge { ...@@ -198,6 +227,12 @@ public class HiveMetaStoreBridge {
} }
} }
/**
* Construct the qualified name used to uniquely identify a Database instance in Atlas.
* @param clusterName Name of the cluster to which the Hive component belongs
* @param dbName Name of the Hive database
* @return Unique qualified name to identify the Database instance in Atlas.
*/
public static String getDBQualifiedName(String clusterName, String dbName) { public static String getDBQualifiedName(String clusterName, String dbName) {
return String.format("%s@%s", dbName.toLowerCase(), clusterName); return String.format("%s@%s", dbName.toLowerCase(), clusterName);
} }
...@@ -233,71 +268,109 @@ public class HiveMetaStoreBridge { ...@@ -233,71 +268,109 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for table {}.{}", dbName, tableName); LOG.debug("Getting reference for table {}.{}", dbName, tableName);
String typeName = HiveDataTypes.HIVE_TABLE.getName(); String typeName = HiveDataTypes.HIVE_TABLE.getName();
String entityName = getTableQualifiedName(clusterName, dbName, tableName); String dslQuery = getTableDSLQuery(getClusterName(), dbName, tableName, typeName);
String dslQuery = String.format("%s as t where name = '%s'", typeName, entityName);
return getEntityReferenceFromDSL(typeName, dslQuery); return getEntityReferenceFromDSL(typeName, dslQuery);
} }
static String getTableDSLQuery(String clusterName, String dbName, String tableName, String typeName) {
String entityName = getTableQualifiedName(clusterName, dbName, tableName);
return String.format("%s as t where name = '%s'", typeName, entityName);
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param clusterName Name of the cluster to which the Hive component belongs
* @param dbName Name of the Hive database to which the Table belongs
* @param tableName Name of the Hive table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
public static String getTableQualifiedName(String clusterName, String dbName, String tableName) { public static String getTableQualifiedName(String clusterName, String dbName, String tableName) {
return String.format("%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), clusterName); return String.format("%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), clusterName);
} }
/**
* Create a new table instance in Atlas
* @param dbReference reference to a created Hive database {@link Referenceable} to which this table belongs
* @param hiveTable reference to the Hive {@link Table} from which to map properties
* @return Newly created Hive reference
* @throws Exception
*/
public Referenceable createTableInstance(Referenceable dbReference, Table hiveTable) public Referenceable createTableInstance(Referenceable dbReference, Table hiveTable)
throws Exception { throws Exception {
return createOrUpdateTableInstance(dbReference, null, hiveTable);
}
private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference,
Table hiveTable) throws Exception {
LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName()); LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); if (tableReference == null) {
tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
}
String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable.getDbName(), hiveTable.getTableName()); String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable.getDbName(), hiveTable.getTableName());
tableRef.set(HiveDataModelGenerator.NAME, tableQualifiedName); tableReference.set(HiveDataModelGenerator.NAME, tableQualifiedName);
tableRef.set(HiveDataModelGenerator.TABLE_NAME, hiveTable.getTableName().toLowerCase()); tableReference.set(HiveDataModelGenerator.TABLE_NAME, hiveTable.getTableName().toLowerCase());
tableRef.set("owner", hiveTable.getOwner()); tableReference.set("owner", hiveTable.getOwner());
tableRef.set("createTime", hiveTable.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)); tableReference.set("createTime", hiveTable.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME));
tableRef.set("lastAccessTime", hiveTable.getLastAccessTime()); tableReference.set("lastAccessTime", hiveTable.getLastAccessTime());
tableRef.set("retention", hiveTable.getRetention()); tableReference.set("retention", hiveTable.getRetention());
tableRef.set(HiveDataModelGenerator.COMMENT, hiveTable.getParameters().get(HiveDataModelGenerator.COMMENT)); tableReference.set(HiveDataModelGenerator.COMMENT, hiveTable.getParameters().get(HiveDataModelGenerator.COMMENT));
// add reference to the database // add reference to the database
tableRef.set(HiveDataModelGenerator.DB, dbReference); tableReference.set(HiveDataModelGenerator.DB, dbReference);
tableRef.set("columns", getColumns(hiveTable.getCols(), tableQualifiedName)); tableReference.set("columns", getColumns(hiveTable.getCols(), tableQualifiedName));
// add reference to the StorageDescriptor // add reference to the StorageDescriptor
Referenceable sdReferenceable = fillStorageDescStruct(hiveTable.getSd(), tableQualifiedName, tableQualifiedName); Referenceable sdReferenceable = fillStorageDescStruct(hiveTable.getSd(), tableQualifiedName, tableQualifiedName);
tableRef.set("sd", sdReferenceable); tableReference.set("sd", sdReferenceable);
// add reference to the Partition Keys // add reference to the Partition Keys
List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableQualifiedName); List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableQualifiedName);
tableRef.set("partitionKeys", partKeys); tableReference.set("partitionKeys", partKeys);
tableRef.set("parameters", hiveTable.getParameters()); tableReference.set("parameters", hiveTable.getParameters());
if (hiveTable.getViewOriginalText() != null) { if (hiveTable.getViewOriginalText() != null) {
tableRef.set("viewOriginalText", hiveTable.getViewOriginalText()); tableReference.set("viewOriginalText", hiveTable.getViewOriginalText());
} }
if (hiveTable.getViewExpandedText() != null) { if (hiveTable.getViewExpandedText() != null) {
tableRef.set("viewExpandedText", hiveTable.getViewExpandedText()); tableReference.set("viewExpandedText", hiveTable.getViewExpandedText());
} }
tableRef.set("tableType", hiveTable.getTableType().name()); tableReference.set(TABLE_TYPE_ATTR, hiveTable.getTableType().name());
tableRef.set("temporary", hiveTable.isTemporary()); tableReference.set("temporary", hiveTable.isTemporary());
return tableRef; return tableReference;
} }
private Referenceable registerTable(Referenceable dbReference, Table table) throws Exception { private Referenceable registerTable(Referenceable dbReference, Table table) throws Exception {
String dbName = table.getDbName(); String dbName = table.getDbName();
String tableName = table.getTableName(); String tableName = table.getTableName();
LOG.info("Attempting to register table [" + tableName + "]"); LOG.info("Attempting to register table [" + tableName + "]");
Referenceable tableRef = getTableReference(dbName, tableName); Referenceable tableReference = getTableReference(dbName, tableName);
if (tableRef == null) { if (tableReference == null) {
tableRef = createTableInstance(dbReference, table); tableReference = createTableInstance(dbReference, table);
tableRef = registerInstance(tableRef); tableReference = registerInstance(tableReference);
} else { } else {
LOG.info("Table {}.{} is already registered with id {}", dbName, tableName, tableRef.getId().id); LOG.info("Table {}.{} is already registered with id {}. Updating entity.", dbName, tableName,
tableReference.getId().id);
tableReference = createOrUpdateTableInstance(dbReference, tableReference, table);
updateInstance(tableReference);
} }
return tableRef; return tableReference;
}
private void updateInstance(Referenceable referenceable) throws AtlasServiceException {
String typeName = referenceable.getTypeName();
LOG.debug("updating instance of type " + typeName);
String entityJSON = InstanceSerialization.toJson(referenceable, true);
LOG.debug("Updating entity {} = {}", referenceable.getTypeName(), entityJSON);
atlasClient.updateEntity(referenceable.getId().id, referenceable);
} }
...@@ -308,14 +381,13 @@ public class HiveMetaStoreBridge { ...@@ -308,14 +381,13 @@ public class HiveMetaStoreBridge {
if (results.length() == 0) { if (results.length() == 0) {
return null; return null;
} }
String guid = results.getJSONObject(0).getString("__guid"); String guid = results.getJSONObject(0).getString(SEARCH_ENTRY_GUID_ATTR);
return new Referenceable(guid, typeName, null); return new Referenceable(guid, typeName, null);
} }
private Referenceable getPartitionReference(String dbName, String tableName, List<String> values) throws Exception { private Referenceable getPartitionReference(String dbName, String tableName, List<String> values) throws Exception {
String valuesStr = "['" + StringUtils.join(values, "', '") + "']"; String valuesStr = joinPartitionValues(values);
LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr); LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr);
String typeName = HiveDataTypes.HIVE_PARTITION.getName();
//todo replace gremlin with DSL //todo replace gremlin with DSL
// String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', " // String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', "
...@@ -323,14 +395,23 @@ public class HiveMetaStoreBridge { ...@@ -323,14 +395,23 @@ public class HiveMetaStoreBridge {
// tableName, // tableName,
// dbName, clusterName); // dbName, clusterName);
String datasetType = AtlasClient.DATA_SET_SUPER_TYPE;
String tableEntityName = getTableQualifiedName(clusterName, dbName, tableName); String tableEntityName = getTableQualifiedName(clusterName, dbName, tableName);
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')." String gremlinQuery = getPartitionGremlinQuery(valuesStr, tableEntityName);
return getEntityReferenceFromGremlin(HiveDataTypes.HIVE_PARTITION.getName(), gremlinQuery);
}
static String joinPartitionValues(List<String> values) {
return "['" + StringUtils.join(values, "', '") + "']";
}
static String getPartitionGremlinQuery(String valuesStr, String tableEntityName) {
String typeName = HiveDataTypes.HIVE_PARTITION.getName();
String datasetType = AtlasClient.DATA_SET_SUPER_TYPE;
return String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')."
+ "out('__%s.table').has('%s.name', '%s').back('p').toList()", typeName, typeName, valuesStr, + "out('__%s.table').has('%s.name', '%s').back('p').toList()", typeName, typeName, valuesStr,
typeName, datasetType, tableEntityName); typeName, datasetType, tableEntityName);
return getEntityReferenceFromGremlin(typeName, gremlinQuery);
} }
private Referenceable getSDForTable(String dbName, String tableName) throws Exception { private Referenceable getSDForTable(String dbName, String tableName) throws Exception {
...@@ -369,15 +450,22 @@ public class HiveMetaStoreBridge { ...@@ -369,15 +450,22 @@ public class HiveMetaStoreBridge {
partRef = createPartitionReferenceable(tableReferenceable, sdReferenceable, hivePart); partRef = createPartitionReferenceable(tableReferenceable, sdReferenceable, hivePart);
partRef = registerInstance(partRef); partRef = registerInstance(partRef);
} else { } else {
LOG.info("Partition {}.{} with values {} is already registered with id {}", dbName, tableName, LOG.info("Partition {}.{} with values {} is already registered with id {}. Updating entity",
dbName, tableName,
StringUtils.join(hivePart.getValues(), ","), partRef.getId().id); StringUtils.join(hivePart.getValues(), ","), partRef.getId().id);
partRef =
createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, partRef);
updateInstance(partRef);
} }
return partRef; return partRef;
} }
public Referenceable createPartitionReferenceable(Referenceable tableReferenceable, Referenceable sdReferenceable, private Referenceable createOrUpdatePartitionReferenceable(Referenceable tableReferenceable,
Partition hivePart) { Referenceable sdReferenceable,
Referenceable partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName()); Partition hivePart, Referenceable partRef) {
if (partRef == null) {
partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName());
}
partRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getPartitionQualifiedName(hivePart)); partRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getPartitionQualifiedName(hivePart));
partRef.set("values", hivePart.getValues()); partRef.set("values", hivePart.getValues());
...@@ -385,7 +473,7 @@ public class HiveMetaStoreBridge { ...@@ -385,7 +473,7 @@ public class HiveMetaStoreBridge {
//todo fix //todo fix
partRef.set("createTime", hivePart.getLastAccessTime()); partRef.set("createTime", hivePart.getLastAccessTime());
partRef.set("lastAccessTime", hivePart.getLastAccessTime()); partRef.set(LAST_ACCESS_TIME_ATTR, hivePart.getLastAccessTime());
// sdStruct = fillStorageDescStruct(hivePart.getSd()); // sdStruct = fillStorageDescStruct(hivePart.getSd());
// Instead of creating copies of the sdstruct for partitions we are reusing existing // Instead of creating copies of the sdstruct for partitions we are reusing existing
...@@ -396,6 +484,18 @@ public class HiveMetaStoreBridge { ...@@ -396,6 +484,18 @@ public class HiveMetaStoreBridge {
return partRef; return partRef;
} }
/**
* Create a Hive partition instance in Atlas
* @param tableReferenceable The Hive Table {@link Referenceable} to which this partition belongs.
* @param sdReferenceable The Storage descriptor {@link Referenceable} for this table.
* @param hivePart The Hive {@link Partition} object being created
* @return Newly created Hive partition instance
*/
public Referenceable createPartitionReferenceable(Referenceable tableReferenceable, Referenceable sdReferenceable,
Partition hivePart) {
return createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, null);
}
private String getPartitionQualifiedName(Partition partition) { private String getPartitionQualifiedName(Partition partition) {
return String.format("%s.%s.%s@%s", partition.getTable().getDbName(), return String.format("%s.%s.%s@%s", partition.getTable().getDbName(),
partition.getTable().getTableName(), StringUtils.join(partition.getValues(), "-"), clusterName); partition.getTable().getTableName(), StringUtils.join(partition.getValues(), "-"), clusterName);
...@@ -480,6 +580,13 @@ public class HiveMetaStoreBridge { ...@@ -480,6 +580,13 @@ public class HiveMetaStoreBridge {
return colList; return colList;
} }
/**
* Register the Hive DataModel in Atlas, if not already defined.
*
* The method checks for the presence of the type {@link HiveDataTypes#HIVE_PROCESS} with the Atlas server.
* If this type is defined, then we assume the Hive DataModel is registered.
* @throws Exception
*/
public synchronized void registerHiveDataModel() throws Exception { public synchronized void registerHiveDataModel() throws Exception {
HiveDataModelGenerator dataModelGenerator = new HiveDataModelGenerator(); HiveDataModelGenerator dataModelGenerator = new HiveDataModelGenerator();
AtlasClient dgiClient = getAtlasClient(); AtlasClient dgiClient = getAtlasClient();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.bridge;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.mapred.TextInputFormat;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import scala.actors.threadpool.Arrays;
import java.util.List;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class HiveMetaStoreBridgeTest {
private static final String TEST_DB_NAME = "default";
public static final String CLUSTER_NAME = "primary";
public static final String TEST_TABLE_NAME = "test_table";
@Mock
private Hive hiveClient;
@Mock
private AtlasClient atlasClient;
@BeforeMethod
public void initializeMocks() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testImportThatUpdatesRegisteredDatabase() throws Exception {
// setup database
when(hiveClient.getAllDatabases()).thenReturn(Arrays.asList(new String[]{TEST_DB_NAME}));
String description = "This is a default database";
when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn(
new Database(TEST_DB_NAME, description, "/user/hive/default", null));
when(hiveClient.getAllTables(TEST_DB_NAME)).thenReturn(Arrays.asList(new String[]{}));
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
bridge.importHiveMetadata();
// verify update is called
verify(atlasClient).updateEntity(eq("72e06b34-9151-4023-aa9d-b82103a50e76"),
(Referenceable) argThat(
new MatchesReferenceableProperty(HiveMetaStoreBridge.DESCRIPTION_ATTR, description)));
}
@Test
public void testImportThatUpdatesRegisteredTable() throws Exception {
setupDB(hiveClient, TEST_DB_NAME);
setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
// return existing table
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME,
HiveDataTypes.HIVE_TABLE.getName()))).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
bridge.importHiveMetadata();
// verify update is called on table
verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"),
(Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.TABLE_TYPE_ATTR,
TableType.EXTERNAL_TABLE.name())));
}
private void returnExistingDatabase(String databaseName, AtlasClient atlasClient, String clusterName)
throws AtlasServiceException, JSONException {
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getDatabaseDSLQuery(clusterName, databaseName,
HiveDataTypes.HIVE_DB.getName()))).thenReturn(
getEntityReference("72e06b34-9151-4023-aa9d-b82103a50e76"));
}
private Table setupTable(Hive hiveClient, String databaseName, String tableName) throws HiveException {
when(hiveClient.getAllTables(databaseName)).thenReturn(Arrays.asList(new String[]{tableName}));
Table testTable = createTestTable(databaseName, tableName);
when(hiveClient.getTable(databaseName, tableName)).thenReturn(testTable);
return testTable;
}
private void setupDB(Hive hiveClient, String databaseName) throws HiveException {
when(hiveClient.getAllDatabases()).thenReturn(Arrays.asList(new String[]{databaseName}));
when(hiveClient.getDatabase(databaseName)).thenReturn(
new Database(databaseName, "Default database", "/user/hive/default", null));
}
@Test
public void testImportThatUpdatesRegisteredPartition() throws Exception {
setupDB(hiveClient, TEST_DB_NAME);
Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
TEST_TABLE_NAME,
HiveDataTypes.HIVE_TABLE.getName()))).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
Partition partition = mock(Partition.class);
when(partition.getTable()).thenReturn(hiveTable);
List partitionValues = Arrays.asList(new String[]{"name", "location"});
when(partition.getValues()).thenReturn(partitionValues);
int lastAccessTime = 1234512345;
when(partition.getLastAccessTime()).thenReturn(lastAccessTime);
when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new Partition[]{partition}));
when(atlasClient.searchByGremlin(
HiveMetaStoreBridge.getPartitionGremlinQuery(
HiveMetaStoreBridge.joinPartitionValues(partitionValues),
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))).
thenReturn(getPartitionReference("9ae06b34-9151-3043-aa9d-b82103a50e99"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
bridge.importHiveMetadata();
verify(atlasClient).updateEntity(eq("9ae06b34-9151-3043-aa9d-b82103a50e99"),
(Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.LAST_ACCESS_TIME_ATTR,
new Integer(lastAccessTime))));
}
private JSONArray getPartitionReference(String id) throws JSONException {
JSONObject resultEntry = new JSONObject();
resultEntry.put(HiveMetaStoreBridge.SEARCH_ENTRY_GUID_ATTR, id);
JSONArray results = new JSONArray();
results.put(resultEntry);
return results;
}
private JSONArray getEntityReference(String id) throws JSONException {
return new JSONArray(String.format("[{\"$id$\":{\"id\":\"%s\"}}]", id));
}
private Referenceable createTableReference() {
Referenceable tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
Referenceable sdReference = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
tableReference.set("sd", sdReference);
return tableReference;
}
private Table createTestTable(String databaseName, String tableName) throws HiveException {
Table table = new Table(databaseName, tableName);
table.setInputFormatClass(TextInputFormat.class);
table.setTableType(TableType.EXTERNAL_TABLE);
return table;
}
private class MatchesReferenceableProperty extends ArgumentMatcher<Object> {
private final String attrName;
private final Object attrValue;
public MatchesReferenceableProperty(String attrName, Object attrValue) {
this.attrName = attrName;
this.attrValue = attrValue;
}
@Override
public boolean matches(Object o) {
return attrValue.equals(((Referenceable) o).get(attrName));
}
}
}
...@@ -1171,7 +1171,7 @@ ...@@ -1171,7 +1171,7 @@
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>
<version>1.8.5</version> <version>1.8.5</version>
<scope>provided</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
......
...@@ -7,6 +7,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -7,6 +7,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-415 Hive import fails when importing a table that is already imported without StorageDescriptor information (yhemanth via shwethags)
ATLAS-450 quick_start fails on cygwin (dkantor via shwethags) ATLAS-450 quick_start fails on cygwin (dkantor via shwethags)
ATLAS-451 Doc: Fix few broken links due to Wiki words in Atlas documentation (ssainath via shwethags) ATLAS-451 Doc: Fix few broken links due to Wiki words in Atlas documentation (ssainath via shwethags)
ATLAS-439 Investigate apache build failure - EntityJerseyResourceIT.testEntityDeduping (shwethags) ATLAS-439 Investigate apache build failure - EntityJerseyResourceIT.testEntityDeduping (shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment