Commit bd39a509 by rmani Committed by Madhan Neethiraj

ATLAS-2511: updated import-hive utility to add options to selectively import given database/tables

parent 31eb3664
...@@ -31,6 +31,8 @@ done ...@@ -31,6 +31,8 @@ done
BASEDIR=`dirname ${PRG}` BASEDIR=`dirname ${PRG}`
BASEDIR=`cd ${BASEDIR}/..;pwd` BASEDIR=`cd ${BASEDIR}/..;pwd`
allargs=$@
if test -z "${JAVA_HOME}" if test -z "${JAVA_HOME}"
then then
JAVA_BIN=`which java` JAVA_BIN=`which java`
...@@ -128,8 +130,8 @@ done ...@@ -128,8 +130,8 @@ done
echo "Log file for import is $LOGFILE" echo "Log file for import is $LOGFILE"
"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge "${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge $allargs
RETVAL=$? RETVAL=$?
[ $RETVAL -eq 0 ] && echo Hive Data Model imported successfully!!! [ $RETVAL -eq 0 ] && echo Hive Meta Data imported successfully!!!
[ $RETVAL -ne 0 ] && echo Failed to import Hive Data Model!!! [ $RETVAL -ne 0 ] && echo Failed to import Hive Meta Data!!!
...@@ -36,11 +36,11 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; ...@@ -36,11 +36,11 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
...@@ -86,7 +86,9 @@ public class HiveMetaStoreBridge { ...@@ -86,7 +86,9 @@ public class HiveMetaStoreBridge {
public static final String SEP = ":".intern(); public static final String SEP = ":".intern();
public static final String HDFS_PATH = "hdfs_path"; public static final String HDFS_PATH = "hdfs_path";
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_FAILED = 1;
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance(); private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
private final String clusterName; private final String clusterName;
...@@ -95,16 +97,27 @@ public class HiveMetaStoreBridge { ...@@ -95,16 +97,27 @@ public class HiveMetaStoreBridge {
private final boolean convertHdfsPathToLowerCase; private final boolean convertHdfsPathToLowerCase;
public static void main(String[] args) throws AtlasHookException { public static void main(String[] args) {
try { int exitCode = EXIT_CODE_FAILED;
Configuration atlasConf = ApplicationProperties.get();
String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
if (atlasEndpoint == null || atlasEndpoint.length == 0){ try {
Options options = new Options();
options.addOption("d", "database", true, "Databbase name");
options.addOption("t", "table", true, "Table name");
options.addOption("failOnError", false, "failOnError");
CommandLine cmd = new BasicParser().parse(options, args);
boolean failOnError = cmd.hasOption("failOnError");
String databaseToImport = cmd.getOptionValue("d");
String tableToImport = cmd.getOptionValue("t");
Configuration atlasConf = ApplicationProperties.get();
String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
if (atlasEndpoint == null || atlasEndpoint.length == 0) {
atlasEndpoint = new String[] { DEFAULT_ATLAS_URL }; atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
} }
AtlasClientV2 atlasClientV2; final AtlasClientV2 atlasClientV2;
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
...@@ -116,17 +129,35 @@ public class HiveMetaStoreBridge { ...@@ -116,17 +129,35 @@ public class HiveMetaStoreBridge {
atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint); atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
} }
Options options = new Options();
CommandLineParser parser = new BasicParser();
CommandLine cmd = parser.parse(options, args);
boolean failOnError = cmd.hasOption("failOnError");
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2); HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
hiveMetaStoreBridge.importHiveMetadata(failOnError); hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError);
exitCode = EXIT_CODE_SUCCESS;
} catch(ParseException e) {
LOG.error("Failed to parse arguments. Error: ", e.getMessage());
printUsage();
} catch(Exception e) { } catch(Exception e) {
throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e); LOG.error("Import failed", e);
} }
System.exit(exitCode);
}
private static void printUsage() {
System.out.println();
System.out.println();
System.out.println("Usage 1: import-hive.sh [-d <database> OR --database <database>] " );
System.out.println(" Imports specified database and its tables ...");
System.out.println();
System.out.println("Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
System.out.println(" Imports specified table within that database ...");
System.out.println();
System.out.println("Usage 3: import-hive.sh");
System.out.println(" Imports all databases and tables...");
System.out.println();
System.out.println();
} }
/** /**
...@@ -174,23 +205,33 @@ public class HiveMetaStoreBridge { ...@@ -174,23 +205,33 @@ public class HiveMetaStoreBridge {
@VisibleForTesting @VisibleForTesting
public void importHiveMetadata(boolean failOnError) throws Exception { public void importHiveMetadata(String databaseToImport, String tableToImport, boolean failOnError) throws Exception {
LOG.info("Importing Hive metadata"); LOG.info("Importing Hive metadata");
importDatabases(failOnError); importDatabases(failOnError, databaseToImport, tableToImport);
} }
private void importDatabases(boolean failOnError) throws Exception { private void importDatabases(boolean failOnError, String databaseToImport, String tableToImport) throws Exception {
List<String> databases = hiveClient.getAllDatabases(); final List<String> databaseNames;
if (StringUtils.isEmpty(databaseToImport)) {
databaseNames = hiveClient.getAllDatabases();
} else {
databaseNames = hiveClient.getDatabasesByPattern(databaseToImport);
}
LOG.info("Found {} databases", databases.size()); if(!CollectionUtils.isEmpty(databaseNames)) {
LOG.info("Found {} databases", databaseNames.size());
for (String databaseName : databases) { for (String databaseName : databaseNames) {
AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName); AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName);
if (dbEntity != null) { if (dbEntity != null) {
importTables(dbEntity.getEntity(), databaseName, failOnError); importTables(dbEntity.getEntity(), databaseName, tableToImport, failOnError);
}
} }
} else {
LOG.info("No database found");
} }
} }
...@@ -201,25 +242,35 @@ public class HiveMetaStoreBridge { ...@@ -201,25 +242,35 @@ public class HiveMetaStoreBridge {
* @param failOnError * @param failOnError
* @throws Exception * @throws Exception
*/ */
private int importTables(AtlasEntity dbEntity, String databaseName, final boolean failOnError) throws Exception { private int importTables(AtlasEntity dbEntity, String databaseName, String tblName, final boolean failOnError) throws Exception {
List<String> hiveTables = hiveClient.getAllTables(databaseName); int tablesImported = 0;
LOG.info("Found {} tables in database {}", hiveTables.size(), databaseName); final List<String> tableNames;
int tablesImported = 0; if (StringUtils.isEmpty(tblName)) {
tableNames = hiveClient.getAllTables(databaseName);
} else {
tableNames = hiveClient.getTablesByPattern(databaseName, tblName);
}
try { if(!CollectionUtils.isEmpty(tableNames)) {
for (String tableName : hiveTables) { LOG.info("Found {} tables to import in database {}", tableNames.size(), databaseName);
int imported = importTable(dbEntity, databaseName, tableName, failOnError);
tablesImported += imported; try {
} for (String tableName : tableNames) {
} finally { int imported = importTable(dbEntity, databaseName, tableName, failOnError);
if (tablesImported == hiveTables.size()) {
LOG.info("Successfully imported all {} tables from database {}", tablesImported, databaseName); tablesImported += imported;
} else { }
LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, hiveTables.size(), databaseName); } finally {
if (tablesImported == tableNames.size()) {
LOG.info("Successfully imported {} tables from database {}", tablesImported, databaseName);
} else {
LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, tableNames.size(), databaseName);
}
} }
} else {
LOG.info("No tables to import in database {}", databaseName);
} }
return tablesImported; return tablesImported;
......
...@@ -82,77 +82,81 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -82,77 +82,81 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.debug("==> HiveHook.run({})", hookContext.getOperationName()); LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
} }
HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName()); try {
AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext); HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName());
AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext);
BaseHiveEvent event = null;
BaseHiveEvent event = null;
switch (oper) {
case CREATEDATABASE: switch (oper) {
event = new CreateDatabase(context); case CREATEDATABASE:
break; event = new CreateDatabase(context);
break;
case DROPDATABASE:
event = new DropDatabase(context); case DROPDATABASE:
break; event = new DropDatabase(context);
break;
case ALTERDATABASE:
case ALTERDATABASE_OWNER: case ALTERDATABASE:
event = new AlterDatabase(context); case ALTERDATABASE_OWNER:
break; event = new AlterDatabase(context);
break;
case CREATETABLE:
event = new CreateTable(context, true); case CREATETABLE:
break; event = new CreateTable(context, true);
break;
case DROPTABLE:
case DROPVIEW: case DROPTABLE:
event = new DropTable(context); case DROPVIEW:
break; event = new DropTable(context);
break;
case CREATETABLE_AS_SELECT:
case CREATEVIEW: case CREATETABLE_AS_SELECT:
case ALTERVIEW_AS: case CREATEVIEW:
case LOAD: case ALTERVIEW_AS:
case EXPORT: case LOAD:
case IMPORT: case EXPORT:
case QUERY: case IMPORT:
case TRUNCATETABLE: case QUERY:
event = new CreateHiveProcess(context); case TRUNCATETABLE:
break; event = new CreateHiveProcess(context);
break;
case ALTERTABLE_FILEFORMAT:
case ALTERTABLE_CLUSTER_SORT: case ALTERTABLE_FILEFORMAT:
case ALTERTABLE_BUCKETNUM: case ALTERTABLE_CLUSTER_SORT:
case ALTERTABLE_PROPERTIES: case ALTERTABLE_BUCKETNUM:
case ALTERVIEW_PROPERTIES: case ALTERTABLE_PROPERTIES:
case ALTERTABLE_SERDEPROPERTIES: case ALTERVIEW_PROPERTIES:
case ALTERTABLE_SERIALIZER: case ALTERTABLE_SERDEPROPERTIES:
case ALTERTABLE_ADDCOLS: case ALTERTABLE_SERIALIZER:
case ALTERTABLE_REPLACECOLS: case ALTERTABLE_ADDCOLS:
case ALTERTABLE_PARTCOLTYPE: case ALTERTABLE_REPLACECOLS:
case ALTERTABLE_LOCATION: case ALTERTABLE_PARTCOLTYPE:
event = new AlterTable(context); case ALTERTABLE_LOCATION:
break; event = new AlterTable(context);
break;
case ALTERTABLE_RENAME:
case ALTERVIEW_RENAME: case ALTERTABLE_RENAME:
event = new AlterTableRename(context); case ALTERVIEW_RENAME:
break; event = new AlterTableRename(context);
break;
case ALTERTABLE_RENAMECOL:
event = new AlterTableRenameCol(context); case ALTERTABLE_RENAMECOL:
break; event = new AlterTableRenameCol(context);
break;
default:
if (LOG.isDebugEnabled()) { default:
LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName()); if (LOG.isDebugEnabled()) {
} LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName());
break; }
} break;
}
if (event != null) { if (event != null) {
super.notifyEntities(event.getNotificationMessages()); super.notifyEntities(event.getNotificationMessages());
}
} catch (Throwable t) {
LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
......
...@@ -97,7 +97,7 @@ public class HiveMetaStoreBridgeTest { ...@@ -97,7 +97,7 @@ public class HiveMetaStoreBridgeTest {
getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity()); getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity());
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
bridge.importHiveMetadata(true); bridge.importHiveMetadata(null, null, true);
// verify update is called // verify update is called
verify(atlasClientV2).updateEntity(anyObject()); verify(atlasClientV2).updateEntity(anyObject());
...@@ -126,7 +126,8 @@ public class HiveMetaStoreBridgeTest { ...@@ -126,7 +126,8 @@ public class HiveMetaStoreBridgeTest {
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
.thenReturn(createTableReference()); .thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(0)); Table testTable = hiveTables.get(0);
String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
...@@ -136,7 +137,7 @@ public class HiveMetaStoreBridgeTest { ...@@ -136,7 +137,7 @@ public class HiveMetaStoreBridgeTest {
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
bridge.importHiveMetadata(true); bridge.importHiveMetadata(null, null, true);
// verify update is called on table // verify update is called on table
verify(atlasClientV2, times(2)).updateEntity(anyObject()); verify(atlasClientV2, times(2)).updateEntity(anyObject());
...@@ -207,7 +208,7 @@ public class HiveMetaStoreBridgeTest { ...@@ -207,7 +208,7 @@ public class HiveMetaStoreBridgeTest {
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
try { try {
bridge.importHiveMetadata(true); bridge.importHiveMetadata(null, null, true);
} catch (Exception e) { } catch (Exception e) {
Assert.fail("Partition with null key caused import to fail with exception ", e); Assert.fail("Partition with null key caused import to fail with exception ", e);
} }
...@@ -231,7 +232,8 @@ public class HiveMetaStoreBridgeTest { ...@@ -231,7 +232,8 @@ public class HiveMetaStoreBridgeTest {
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
.thenReturn(createTableReference()); .thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1)); Table testTable = hiveTables.get(1);
String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
...@@ -241,7 +243,7 @@ public class HiveMetaStoreBridgeTest { ...@@ -241,7 +243,7 @@ public class HiveMetaStoreBridgeTest {
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
try { try {
bridge.importHiveMetadata(false); bridge.importHiveMetadata(null, null, false);
} catch (Exception e) { } catch (Exception e) {
Assert.fail("Table registration failed with exception", e); Assert.fail("Table registration failed with exception", e);
} }
...@@ -267,7 +269,8 @@ public class HiveMetaStoreBridgeTest { ...@@ -267,7 +269,8 @@ public class HiveMetaStoreBridgeTest {
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
.thenReturn(createTableReference()); .thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1)); Table testTable = hiveTables.get(1);
String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
...@@ -277,7 +280,7 @@ public class HiveMetaStoreBridgeTest { ...@@ -277,7 +280,7 @@ public class HiveMetaStoreBridgeTest {
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
try { try {
bridge.importHiveMetadata(true); bridge.importHiveMetadata(null, null, true);
Assert.fail("Table registration is supposed to fail"); Assert.fail("Table registration is supposed to fail");
} catch (Exception e) { } catch (Exception e) {
//Expected //Expected
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment