Commit 76b05603 by Suma Shivaprasad

ATLAS-948 import-hive should allow an option to continue after failure(sumasai)

parent 377bc635
......@@ -33,6 +33,10 @@ import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.Path;
......@@ -111,17 +115,16 @@ public class HiveMetaStoreBridge {
return atlasClient;
}
void importHiveMetadata() throws Exception {
void importHiveMetadata(boolean failOnError) throws Exception {
LOG.info("Importing hive metadata");
importDatabases();
importDatabases(failOnError);
}
private void importDatabases() throws Exception {
private void importDatabases(boolean failOnError) throws Exception {
List<String> databases = hiveClient.getAllDatabases();
for (String databaseName : databases) {
Referenceable dbReference = registerDatabase(databaseName);
importTables(dbReference, databaseName);
importTables(dbReference, databaseName, failOnError);
}
}
......@@ -254,20 +257,24 @@ public class HiveMetaStoreBridge {
/**
* Imports all tables for the given db
* @param databaseName
* @param databaseReferenceable
* @param databaseName
* @param failOnError
* @throws Exception
*/
private void importTables(Referenceable databaseReferenceable, String databaseName) throws Exception {
private int importTables(Referenceable databaseReferenceable, String databaseName, final boolean failOnError) throws Exception {
int tablesImported = 0;
List<String> hiveTables = hiveClient.getAllTables(databaseName);
LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName);
for (String tableName : hiveTables) {
try {
Table table = hiveClient.getTable(databaseName, tableName);
Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
if (table.getTableType() == TableType.EXTERNAL_TABLE){
tablesImported++;
if (table.getTableType() == TableType.EXTERNAL_TABLE) {
String tableQualifiedName = getTableQualifiedName(clusterName, table);
Referenceable process = getProcessReference(tableQualifiedName);
if (process == null){
if (process == null) {
LOG.info("Attempting to register create table process for {}", tableQualifiedName);
Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
ArrayList<Referenceable> sourceList = new ArrayList<>();
......@@ -293,15 +300,27 @@ public class HiveMetaStoreBridge {
lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
lineageProcess.set(AtlasClient.NAME, query);
registerInstance(lineageProcess);
}
else {
} else {
LOG.info("Process {} is already registered", process.toString());
}
}
} catch (Exception e) {
LOG.error("Import failed for hive_table {} ", tableName, e);
if (failOnError) {
throw e;
}
}
}
if ( tablesImported == hiveTables.size()) {
LOG.info("Successfully imported all {} tables from {} ", tablesImported, databaseName);
} else {
LOG.error("Unable to import {} tables out of {} tables from {}", tablesImported, hiveTables.size(), databaseName);
}
return tablesImported;
}
/**
* Gets reference for the table
*
......@@ -618,7 +637,7 @@ public class HiveMetaStoreBridge {
}
}
public static void main(String[] argv) throws Exception {
public static void main(String[] args) throws Exception {
Configuration atlasConf = ApplicationProperties.get();
String atlasEndpoint = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
......@@ -632,8 +651,17 @@ public class HiveMetaStoreBridge {
atlasClient = new AtlasClient(ugi, ugi.getShortUserName(), atlasEndpoint);
}
Options options = new Options();
CommandLineParser parser = new BasicParser();
CommandLine cmd = parser.parse( options, args);
boolean failOnError = false;
if (cmd.hasOption("failOnError")) {
failOnError = true;
}
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
hiveMetaStoreBridge.importHiveMetadata();
hiveMetaStoreBridge.importHiveMetadata(failOnError);
}
}
......@@ -41,6 +41,8 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import scala.actors.threadpool.Arrays;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Mockito.argThat;
......@@ -78,7 +80,7 @@ public class HiveMetaStoreBridgeTest {
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
bridge.importHiveMetadata();
bridge.importHiveMetadata(true);
// verify update is called
verify(atlasClient).updateEntity(eq("72e06b34-9151-4023-aa9d-b82103a50e76"),
......@@ -90,7 +92,7 @@ public class HiveMetaStoreBridgeTest {
public void testImportThatUpdatesRegisteredTable() throws Exception {
setupDB(hiveClient, TEST_DB_NAME);
Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
......@@ -99,12 +101,12 @@ public class HiveMetaStoreBridgeTest {
HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTable);
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(0));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
bridge.importHiveMetadata();
bridge.importHiveMetadata(true);
// verify update is called on table
verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"),
......@@ -119,11 +121,15 @@ public class HiveMetaStoreBridgeTest {
getEntityReference("72e06b34-9151-4023-aa9d-b82103a50e76"));
}
private Table setupTable(Hive hiveClient, String databaseName, String tableName) throws HiveException {
when(hiveClient.getAllTables(databaseName)).thenReturn(Arrays.asList(new String[]{tableName}));
private List<Table> setupTables(Hive hiveClient, String databaseName, String... tableNames) throws HiveException {
List<Table> tables = new ArrayList<>();
when(hiveClient.getAllTables(databaseName)).thenReturn(Arrays.asList(tableNames));
for(String tableName : tableNames) {
Table testTable = createTestTable(databaseName, tableName);
when(hiveClient.getTable(databaseName, tableName)).thenReturn(testTable);
return testTable;
tables.add(testTable);
}
return tables;
}
private void setupDB(Hive hiveClient, String databaseName) throws HiveException {
......@@ -135,7 +141,8 @@ public class HiveMetaStoreBridgeTest {
@Test
public void testImportWhenPartitionKeysAreNull() throws Exception {
setupDB(hiveClient, TEST_DB_NAME);
Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
Table hiveTable = hiveTables.get(0);
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
......@@ -157,12 +164,65 @@ public class HiveMetaStoreBridgeTest {
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
try {
bridge.importHiveMetadata();
bridge.importHiveMetadata(true);
} catch (Exception e) {
Assert.fail("Partition with null key caused import to fail with exception ", e);
}
}
@Test
public void testImportContinuesWhenTableRegistrationFails() throws Exception {
setupDB(hiveClient, TEST_DB_NAME);
final String table2Name = TEST_TABLE_NAME + "_1";
List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name);
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
table2Name,
HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
try {
bridge.importHiveMetadata(false);
} catch (Exception e) {
Assert.fail("Table registration failed with exception", e);
}
}
@Test
public void testImportFailsWhenTableRegistrationFails() throws Exception {
setupDB(hiveClient, TEST_DB_NAME);
final String table2Name = TEST_TABLE_NAME + "_1";
List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name);
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
table2Name,
HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1));
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
try {
bridge.importHiveMetadata(true);
Assert.fail("Table registration is supposed to fail");
} catch (Exception e) {
//Expected
}
}
private JSONArray getEntityReference(String id) throws JSONException {
return new JSONArray(String.format("[{\"$id$\":{\"id\":\"%s\"}}]", id));
}
......
......@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ALL CHANGES:
ATLAS-948 import-hive should allow an option to continue after failure (sumasai)
ATLAS-954 Get hadoop classpath if command hadoop is in PATH (svimal2106 via sumasai)
ATLAS-919 UI : Deleted references should be shown in red or filtered out (kevalbhatt18 via sumasai)
ATLAS-927 aboutAtlas_tmpl.html has hard-coded project version (Kalyanikashikar via yhemanth)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment