Commit 3f51160f by Shwetha GS

ATLAS-1096 Modify HveMetaStoreBridge.import to use getEntity instead of DSL (sumasai via shwethags)

parent 38fd4f35
......@@ -54,8 +54,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -72,7 +70,6 @@ public class HiveMetaStoreBridge {
public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String DESCRIPTION_ATTR = "description";
public static final String SEARCH_ENTRY_GUID_ATTR = "__guid";
public static final String TEMP_TABLE_PREFIX = "_temp-";
......@@ -214,30 +211,7 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for database {}", databaseName);
String typeName = HiveDataTypes.HIVE_DB.getName();
String dslQuery = getDatabaseDSLQuery(clusterName, databaseName, typeName);
return getEntityReferenceFromDSL(typeName, dslQuery);
}
static String getDatabaseDSLQuery(String clusterName, String databaseName, String typeName) {
return String.format("%s where %s = '%s' and %s = '%s'", typeName, AtlasClient.NAME,
databaseName.toLowerCase(), AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
}
private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception {
AtlasClient dgiClient = getAtlasClient();
JSONArray results = dgiClient.searchByDSL(dslQuery, 1, 0);
if (results.length() == 0) {
return null;
} else {
String guid;
JSONObject row = results.getJSONObject(0);
if (row.has("$id$")) {
guid = row.getJSONObject("$id$").getString("id");
} else {
guid = row.getJSONObject("_col_0").getString("id");
}
return new Referenceable(guid, typeName, null);
}
return getEntityReference(typeName, getDBQualifiedName(clusterName, databaseName));
}
/**
......@@ -253,11 +227,16 @@ public class HiveMetaStoreBridge {
private String getCreateTableString(Table table, String location){
String colString = "";
List<FieldSchema> colList = table.getAllCols();
for(FieldSchema col:colList){
colString += col.getName() + " " + col.getType() + ",";
if ( colList != null) {
for (FieldSchema col : colList) {
colString += col.getName() + " " + col.getType() + ",";
}
if (colList.size() > 0) {
colString = colString.substring(0, colString.length() - 1);
colString = "(" + colString + ")";
}
}
colString = colString.substring(0, colString.length() - 1);
String query = "create external table " + table.getTableName() + "(" + colString + ")" +
String query = "create external table " + table.getTableName() + colString +
" location '" + location + "'";
return query;
}
......@@ -293,7 +272,7 @@ public class HiveMetaStoreBridge {
Table table = hiveClient.getTable(databaseName, tableName);
Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
if (table.getTableType() == TableType.EXTERNAL_TABLE) {
String tableQualifiedName = getTableQualifiedName(clusterName, table);
String tableQualifiedName = getTableProcessQualifiedName(clusterName, table);
Referenceable process = getProcessReference(tableQualifiedName);
if (process == null) {
LOG.info("Attempting to register create table process for {}", tableQualifiedName);
......@@ -347,25 +326,26 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for table {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
String typeName = HiveDataTypes.HIVE_TABLE.getName();
String dslQuery = getTableDSLQuery(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName(), typeName, hiveTable.isTemporary());
return getEntityReferenceFromDSL(typeName, dslQuery);
String tblQualifiedName = getTableQualifiedName(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName());
return getEntityReference(typeName, tblQualifiedName);
}
private Referenceable getEntityReference(final String typeName, final String tblQualifiedName) throws AtlasServiceException {
AtlasClient dgiClient = getAtlasClient();
try {
return dgiClient.getEntity(typeName, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tblQualifiedName);
} catch (AtlasServiceException e) {
if(e.getStatus() == ClientResponse.Status.NOT_FOUND) {
return null;
}
throw e;
}
}
private Referenceable getProcessReference(String qualifiedName) throws Exception{
LOG.debug("Getting reference for process {}", qualifiedName);
String typeName = HiveDataTypes.HIVE_PROCESS.getName();
String dslQuery = getProcessDSLQuery(typeName, qualifiedName);
return getEntityReferenceFromDSL(typeName, dslQuery);
}
static String getProcessDSLQuery(String typeName, String qualifiedName) throws Exception{
String dslQuery = String.format("%s as t where qualifiedName = '%s'", typeName, qualifiedName);
return dslQuery;
}
static String getTableDSLQuery(String clusterName, String dbName, String tableName, String typeName, boolean isTemporary) {
String entityName = getTableQualifiedName(clusterName, dbName, tableName, isTemporary);
return String.format("%s as t where qualifiedName = '%s'", typeName, entityName);
return getEntityReference(typeName, qualifiedName);
}
/**
......
......@@ -23,8 +23,10 @@ import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
......@@ -95,13 +97,13 @@ public class HiveMetaStoreBridgeTest {
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
// return existing table
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME,
HiveDataTypes.HIVE_TABLE.getName(), false), 1, 0)).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))
.thenReturn(getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(0));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName), 1, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(0));
when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
bridge.importHiveMetadata(true);
......@@ -114,9 +116,10 @@ public class HiveMetaStoreBridgeTest {
private void returnExistingDatabase(String databaseName, AtlasClient atlasClient, String clusterName)
throws AtlasServiceException, JSONException {
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getDatabaseDSLQuery(clusterName, databaseName,
HiveDataTypes.HIVE_DB.getName()), 1, 0)).thenReturn(
getEntityReference("72e06b34-9151-4023-aa9d-b82103a50e76"));
when(atlasClient.getEntity(
HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(clusterName, databaseName))).thenReturn(
getEntityReference(HiveDataTypes.HIVE_DB.getName(), "72e06b34-9151-4023-aa9d-b82103a50e76"));
}
private List<Table> setupTables(Hive hiveClient, String databaseName, String... tableNames) throws HiveException {
......@@ -144,12 +147,12 @@ public class HiveMetaStoreBridgeTest {
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
TEST_TABLE_NAME, HiveDataTypes.HIVE_TABLE.getName(), false), 1, 0)).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTable);
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName), 1, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME))).thenReturn(
getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77"));
String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTable);
when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
Partition partition = mock(Partition.class);
......@@ -176,13 +179,13 @@ public class HiveMetaStoreBridgeTest {
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
table2Name, HiveDataTypes.HIVE_TABLE.getName(), false), 1, 0)).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME,
table2Name))).thenReturn(
getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName), 1, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1));
when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
try {
......@@ -201,13 +204,13 @@ public class HiveMetaStoreBridgeTest {
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
table2Name, HiveDataTypes.HIVE_TABLE.getName(), false), 10, 0)).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME,
table2Name))).thenReturn(
getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName), 10, 0)).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77"));;
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
try {
......@@ -218,8 +221,8 @@ public class HiveMetaStoreBridgeTest {
}
}
private JSONArray getEntityReference(String id) throws JSONException {
return new JSONArray(String.format("[{\"$id$\":{\"id\":\"%s\"}}]", id));
private Referenceable getEntityReference(String typeName, String id) throws JSONException {
return new Referenceable(id, typeName, null);
}
private Referenceable createTableReference() {
......@@ -232,7 +235,12 @@ public class HiveMetaStoreBridgeTest {
private Table createTestTable(String databaseName, String tableName) throws HiveException {
Table table = new Table(databaseName, tableName);
table.setInputFormatClass(TextInputFormat.class);
table.setFields(new ArrayList<FieldSchema>() {{
add(new FieldSchema("col1", "string", "comment1"));
}
});
table.setTableType(TableType.EXTERNAL_TABLE);
table.setDataLocation(new Path("somehdfspath"));
return table;
}
......
......@@ -52,6 +52,8 @@ public class HiveMetastoreBridgeIT extends HiveITBase {
assertEquals(outputs.size(), 1);
assertEquals(outputs.get(0).getId()._getId(), tableId);
int tableCount = atlasClient.listEntities(HiveDataTypes.HIVE_TABLE.getName()).size();
//Now import using import tool - should be no-op
hiveMetaStoreBridge.importTable(atlasClient.getEntity(dbId), DEFAULT_DB, tableName, true);
String tableId2 = assertTableIsRegistered(DEFAULT_DB, tableName);
......@@ -61,6 +63,10 @@ public class HiveMetastoreBridgeIT extends HiveITBase {
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
assertEquals(processId2, processId);
//assert that table is de-duped and no new entity is created
int newTableCount = atlasClient.listEntities(HiveDataTypes.HIVE_TABLE.getName()).size();
assertEquals(newTableCount, tableCount);
}
@Test
......
......@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ALL CHANGES:
ATLAS-1096 Modify HveMetaStoreBridge.import to use getEntity instead of DSL (sumasai via shwethags)
ATLAS-1091 Improvement in DSL search functionality. (kevalbhatt)
ATLAS-1080 Regression - UI - hive_storagedesc is shown as "undefined" in UI.(kevalbhatt)
ATLAS-1089 Storm hook should handle cyclic references in topology object (mneethiraj via sumasai)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment