Commit b4e4f604 by Pinal Committed by nixonrodrigues

ATLAS-3938 : Import Hive Script: Support deletion of non existing database and table entities

parent d0de3897
......@@ -139,6 +139,7 @@ do
--database) IMPORT_ARGS="$IMPORT_ARGS --database $1"; shift;;
--table) IMPORT_ARGS="$IMPORT_ARGS --table $1"; shift;;
--filename) IMPORT_ARGS="$IMPORT_ARGS --filename $1"; shift;;
-deleteNonExisting) IMPORT_ARGS="$IMPORT_ARGS -deleteNonExisting";;
"") break;;
*) JVM_ARGS="$JVM_ARGS $option"
esac
......
......@@ -28,6 +28,8 @@ import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.hook.events.BaseHiveEvent;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations;
......@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
......@@ -97,12 +100,15 @@ public class HiveMetaStoreBridge {
public static final String SEP = ":".intern();
public static final String HDFS_PATH = "hdfs_path";
public static final String DEFAULT_METASTORE_CATALOG = "hive";
public static final String HIVE_TABLE_DB_EDGE_LABEL = "__hive_table.db";
public static final String HOOK_HIVE_PAGE_LIMIT = CONF_PREFIX + "page.limit";
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_FAILED = 1;
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
private static int pageLimit = 10000;
private final String metadataNamespace;
private final Hive hiveClient;
......@@ -122,9 +128,13 @@ public class HiveMetaStoreBridge {
options.addOption("t", "table", true, "Table name");
options.addOption("f", "filename", true, "Filename");
options.addOption("failOnError", false, "failOnError");
options.addOption("deleteNonExisting", false, "Delete database and table entities in Atlas if not present in Hive");
CommandLine cmd = new BasicParser().parse(options, args);
boolean failOnError = cmd.hasOption("failOnError");
boolean deleteNonExisting = cmd.hasOption("deleteNonExisting");
LOG.info("delete non existing flag : {} ", deleteNonExisting);
CommandLine cmd = new BasicParser().parse(options, args);
boolean failOnError = cmd.hasOption("failOnError");
String databaseToImport = cmd.getOptionValue("d");
String tableToImport = cmd.getOptionValue("t");
String fileToImport = cmd.getOptionValue("f");
......@@ -148,7 +158,9 @@ public class HiveMetaStoreBridge {
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
if (StringUtils.isNotEmpty(fileToImport)) {
if (deleteNonExisting) {
hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(failOnError);
} else if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
if (f.exists() && f.canRead()) {
......@@ -212,6 +224,8 @@ public class HiveMetaStoreBridge {
System.out.println(" database1:tbl1");
System.out.println(" database1:tbl2");
System.out.println(" database2:tbl2");
System.out.println("Usage 5: import-hive.sh [-deleteNonExisting] " );
System.out.println(" Deletes databases and tables which are not in Hive ...");
System.out.println();
}
......@@ -225,6 +239,9 @@ public class HiveMetaStoreBridge {
this.atlasClientV2 = atlasClientV2;
this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
this.awsS3AtlasModelVersion = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
if (atlasProperties != null) {
pageLimit = atlasProperties.getInteger(HOOK_HIVE_PAGE_LIMIT, 10000);
}
}
/**
......@@ -959,4 +976,214 @@ public class HiveMetaStoreBridge {
}
return ret;
}
private List<AtlasEntityHeader> getAllDatabaseInCluster() throws AtlasServiceException {
List<AtlasEntityHeader> entities = new ArrayList<>();
final int pageSize = pageLimit;
SearchParameters.FilterCriteria fc = new SearchParameters.FilterCriteria();
fc.setAttributeName(ATTRIBUTE_CLUSTER_NAME);
fc.setAttributeValue(metadataNamespace);
fc.setOperator(SearchParameters.Operator.EQ);
for (int i = 0; ; i++) {
int offset = pageSize * i;
LOG.info("Retrieving databases: offset={}, pageSize={}", offset, pageSize);
AtlasSearchResult searchResult = atlasClientV2.basicSearch(HIVE_TYPE_DB, fc,null, null, true, pageSize, offset);
List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
int dbCount = entityHeaders == null ? 0 : entityHeaders.size();
LOG.info("Retrieved {} databases of {} cluster", dbCount, metadataNamespace);
if (dbCount > 0) {
entities.addAll(entityHeaders);
}
if (dbCount < pageSize) { // last page
break;
}
}
return entities;
}
private List<AtlasEntityHeader> getAllTablesInDb(String databaseGuid) throws AtlasServiceException {
List<AtlasEntityHeader> entities = new ArrayList<>();
final int pageSize = pageLimit;
for (int i = 0; ; i++) {
int offset = pageSize * i;
LOG.info("Retrieving tables: offset={}, pageSize={}", offset, pageSize);
AtlasSearchResult searchResult = atlasClientV2.relationshipSearch(databaseGuid, HIVE_TABLE_DB_EDGE_LABEL, null, null, true, pageSize, offset);
List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
int tableCount = entityHeaders == null ? 0 : entityHeaders.size();
LOG.info("Retrieved {} tables of {} database", tableCount, databaseGuid);
if (tableCount > 0) {
entities.addAll(entityHeaders);
}
if (tableCount < pageSize) { // last page
break;
}
}
return entities;
}
public String getHiveDatabaseName(String qualifiedName) {
if (StringUtils.isNotEmpty(qualifiedName)) {
String[] split = qualifiedName.split("@");
if (split.length > 0) {
return split[0];
}
}
return null;
}
public String getHiveTableName(String qualifiedName, boolean isTemporary) {
if (StringUtils.isNotEmpty(qualifiedName)) {
String tableName = StringUtils.substringBetween(qualifiedName, ".", "@");
if (!isTemporary) {
return tableName;
} else {
if (StringUtils.isNotEmpty(tableName)) {
String[] splitTemp = tableName.split(TEMP_TABLE_PREFIX);
if (splitTemp.length > 0) {
return splitTemp[0];
}
}
}
}
return null;
}
private void deleteByGuid(List<String> guidTodelete) throws AtlasServiceException {
if (CollectionUtils.isNotEmpty(guidTodelete)) {
for (String guid : guidTodelete) {
EntityMutationResponse response = atlasClientV2.deleteEntityByGuid(guid);
if (response.getDeletedEntities().size() < 1) {
LOG.info("Entity with guid : {} is not deleted", guid);
} else {
LOG.info("Entity with guid : {} is deleted", guid);
}
}
} else {
LOG.info("No Entity to delete from Atlas");
}
}
public void deleteEntitiesForNonExistingHiveMetadata(boolean failOnError) throws Exception {
//fetch databases from Atlas
List<AtlasEntityHeader> dbs = null;
try {
dbs = getAllDatabaseInCluster();
LOG.info("Total Databases in cluster {} : {} ", metadataNamespace, dbs.size());
} catch (AtlasServiceException e) {
LOG.error("Failed to retrieve database entities for cluster {} from Atlas", metadataNamespace, e);
if (failOnError) {
throw e;
}
}
if (CollectionUtils.isNotEmpty(dbs)) {
//iterate all dbs to check if exists in hive
for (AtlasEntityHeader db : dbs) {
String dbGuid = db.getGuid();
String hiveDbName = getHiveDatabaseName((String) db.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
if (StringUtils.isEmpty(hiveDbName)) {
LOG.error("Failed to get database from qualifiedName: {}, guid: {} ", db.getAttribute(ATTRIBUTE_QUALIFIED_NAME), dbGuid);
continue;
}
List<AtlasEntityHeader> tables;
try {
tables = getAllTablesInDb(dbGuid);
LOG.info("Total Tables in database {} : {} ", hiveDbName, tables.size());
} catch (AtlasServiceException e) {
LOG.error("Failed to retrieve table entities for database {} from Atlas", hiveDbName, e);
if (failOnError) {
throw e;
}
continue;
}
List<String> guidsToDelete = new ArrayList<>();
if (!hiveClient.databaseExists(hiveDbName)) {
//table guids
if (CollectionUtils.isNotEmpty(tables)) {
for (AtlasEntityHeader table : tables) {
guidsToDelete.add(table.getGuid());
}
}
//db guid
guidsToDelete.add(db.getGuid());
LOG.info("Added database {}.{} and its {} tables to delete", metadataNamespace, hiveDbName, tables.size());
} else {
//iterate all table of db to check if it exists
if (CollectionUtils.isNotEmpty(tables)) {
for (AtlasEntityHeader table : tables) {
String hiveTableName = getHiveTableName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), true);
if (StringUtils.isEmpty(hiveTableName)) {
LOG.error("Failed to get table from qualifiedName: {}, guid: {} ", table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), table.getGuid());
continue;
}
try {
hiveClient.getTable(hiveDbName, hiveTableName, true);
} catch (InvalidTableException e) { //table doesn't exists
LOG.info("Added table {}.{} to delete", hiveDbName, hiveTableName);
guidsToDelete.add(table.getGuid());
} catch (HiveException e) {
LOG.error("Failed to get table {}.{} from Hive", hiveDbName, hiveTableName, e);
if (failOnError) {
throw e;
}
}
}
}
}
//delete entities
if (CollectionUtils.isNotEmpty(guidsToDelete)) {
try {
deleteByGuid(guidsToDelete);
} catch (AtlasServiceException e) {
LOG.error("Failed to delete Atlas entities for database {}", hiveDbName, e);
if (failOnError) {
throw e;
}
}
}
}
} else {
LOG.info("No database found in service.");
}
}
}
......@@ -123,6 +123,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
private static final String RELATIONSHIPS_URI = BASE_URI + "v2/relationship/";
private static final String BULK_HEADERS = "bulk/headers";
private static final String BULK_SET_CLASSIFICATIONS = "bulk/setClassifications";
private static final String RELATIONSHIP_URI = DISCOVERY_URI + "/relationship";
//Glossary APIs
private static final String GLOSSARY_URI = BASE_URI + "v2/glossary";
......@@ -664,16 +666,22 @@ public class AtlasClientV2 extends AtlasBaseClient {
}
public AtlasSearchResult basicSearch(String typeName, String classification, String query, boolean excludeDeletedEntities, int limit, int offset) throws AtlasServiceException {
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
queryParams.add("typeName", typeName);
queryParams.add("classification", classification);
queryParams.add(QUERY, query);
queryParams.add("excludeDeletedEntities", String.valueOf(excludeDeletedEntities));
queryParams.add(LIMIT, String.valueOf(limit));
queryParams.add(OFFSET, String.valueOf(offset));
return this.basicSearch(typeName, null, classification, query, excludeDeletedEntities, limit, offset);
}
public AtlasSearchResult basicSearch(String typeName, SearchParameters.FilterCriteria entityFilters, String classification, String query, boolean excludeDeletedEntities, int limit, int offset) throws AtlasServiceException {
SearchParameters parameters = new SearchParameters();
parameters.setTypeName(typeName);
parameters.setClassification(classification);
parameters.setQuery(query);
parameters.setExcludeDeletedEntities(excludeDeletedEntities);
parameters.setLimit(limit);
parameters.setOffset(offset);
if (entityFilters != null){
parameters.setEntityFilters(entityFilters);
}
return callAPI(API_V2.BASIC_SEARCH, AtlasSearchResult.class, queryParams);
return callAPI(API_V2.BASIC_SEARCH, AtlasSearchResult.class, parameters);
}
public AtlasSearchResult facetedSearch(SearchParameters searchParameters) throws AtlasServiceException {
......@@ -1202,7 +1210,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
// Discovery APIs
public static final API_V2 DSL_SEARCH = new API_V2(DSL_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
public static final API_V2 FULL_TEXT_SEARCH = new API_V2(FULL_TEXT_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
public static final API_V2 BASIC_SEARCH = new API_V2(BASIC_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
public static final API_V2 BASIC_SEARCH = new API_V2(BASIC_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
public static final API_V2 FACETED_SEARCH = new API_V2(FACETED_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
public static final API_V2 ATTRIBUTE_SEARCH = new API_V2(DISCOVERY_URI+ "/attribute", HttpMethod.GET, Response.Status.OK);
public static final API_V2 RELATIONSHIP_SEARCH = new API_V2(DISCOVERY_URI+ "/relationship", HttpMethod.GET, Response.Status.OK);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment