Commit f147d3ff by Suma Shivaprasad

ATLAS-538 Rename table should retain traits/tags assigned to columns/storage descriptors (sumasai)

parent 81a0c6ff
......@@ -353,7 +353,7 @@ public class HiveMetaStoreBridge {
return tableReference;
}
private String getStorageDescQFName(String entityQualifiedName) {
public static String getStorageDescQFName(String entityQualifiedName) {
return entityQualifiedName + "_storage";
}
......
......@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.Task;
......@@ -299,7 +300,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
case CREATETABLE:
List<Pair<? extends Entity, Referenceable>> tablesCreated = handleEventOutputs(dgiBridge, event, Type.TABLE);
handleExternalTables(dgiBridge, event, tablesCreated.get(0).getLeft(), tablesCreated.get(0).getRight());
if (tablesCreated.size() > 0) {
handleExternalTables(dgiBridge, event, tablesCreated.get(0).getLeft(), tablesCreated.get(0).getRight());
}
break;
case CREATETABLE_AS_SELECT:
......@@ -409,28 +412,87 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Table newTable = writeEntity.getTable();
//Hive sends with both old and new table names in the outputs which is weird. So skipping that with the below check
if (!newTable.getDbName().equals(oldTable.getDbName()) || !newTable.getTableName().equals(oldTable.getTableName())) {
//Create/update old table entity - create new entity with oldQFNme and tableName
Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity);
String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
final String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
oldTable.getDbName(), oldTable.getTableName());
tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
tableEntity.set(HiveDataModelGenerator.TABLE_NAME, oldTable.getTableName().toLowerCase());
String newQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
final String newQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
newTable.getDbName(), newTable.getTableName());
//Replace entity with new name
Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName);
newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase());
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME,
oldQualifiedName, newEntity));
//Create/update old table entity - create entity with oldQFNme and old tableName if it doesnt exist. If exists, will update
//We always use the new entity while creating the table since some flags, attributes of the table are not set in inputEntity and Hive.getTable(oldTableName) also fails since the table doesnt exist in hive anymore
final Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity);
//Reset regular column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveDataModelGenerator.COLUMNS), oldQualifiedName, newQualifiedName);
//Reset partition key column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveDataModelGenerator.PART_COLS), oldQualifiedName, newQualifiedName);
//Reset SD QF Name to old Name and create a new partial notification request to replace old SD QFName to newName to retain any existing traits
replaceSDQFName(event, tableEntity, oldQualifiedName, newQualifiedName);
//Reset Table QF Name to old Name and create a new partial notification request to replace old Table QFName to newName
replaceTableQFName(dgiBridge, event, oldTable, newTable, tableEntity, oldQualifiedName, newQualifiedName);
}
}
}
}
private Referenceable replaceTableQFName(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Table oldTable, Table newTable, final Referenceable tableEntity, final String oldTableQFName, final String newTableQFName) throws HiveException {
tableEntity.set(HiveDataModelGenerator.NAME, oldTableQFName);
tableEntity.set(HiveDataModelGenerator.TABLE_NAME, oldTable.getTableName().toLowerCase());
final Referenceable newDbInstance = (Referenceable) tableEntity.get(HiveDataModelGenerator.DB);
tableEntity.set(HiveDataModelGenerator.DB, dgiBridge.createDBInstance(dgiBridge.hiveClient.getDatabase(oldTable.getDbName())));
//Replace table entity with new name
final Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
newEntity.set(HiveDataModelGenerator.NAME, newTableQFName);
newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase());
newEntity.set(HiveDataModelGenerator.DB, newDbInstance);
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME,
oldTableQFName, newEntity));
return newEntity;
}
private List<Referenceable> replaceColumnQFName(final HiveEventContext event, final List<Referenceable> cols, final String oldTableQFName, final String newTableQFName) {
List<Referenceable> newColEntities = new ArrayList<>();
for (Referenceable col : cols) {
final String colName = (String) col.get(HiveDataModelGenerator.NAME);
String oldColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(oldTableQFName, colName);
String newColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newTableQFName, colName);
col.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, oldColumnQFName);
Referenceable newColEntity = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
///Only QF Name changes
newColEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newColumnQFName);
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
oldColumnQFName, newColEntity));
newColEntities.add(newColEntity);
}
return newColEntities;
}
private Referenceable replaceSDQFName(final HiveEventContext event, Referenceable tableEntity, final String oldTblQFName, final String newTblQFName) {
//Reset storage desc QF Name to old Name
final Referenceable sdRef = ((Referenceable) tableEntity.get(HiveDataModelGenerator.STORAGE_DESC));
sdRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getStorageDescQFName(oldTblQFName));
//Replace SD QF name first to retain tags
final String oldSDQFName = HiveMetaStoreBridge.getStorageDescQFName(oldTblQFName);
final String newSDQFName = HiveMetaStoreBridge.getStorageDescQFName(newTblQFName);
final Referenceable newSDEntity = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
newSDEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newSDQFName);
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_STORAGEDESC.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
oldSDQFName, newSDEntity));
return newSDEntity;
}
private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity) throws Exception {
Database db = null;
Table table = null;
......@@ -459,6 +521,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
entities.add(dbEntity);
Referenceable tableEntity = null;
if (table != null) {
table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName());
tableEntity = dgiBridge.createTableInstance(dbEntity, table);
......@@ -508,7 +571,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
boolean isSelectQuery = isSelectQuery(event);
// Also filter out select queries which do not modify data
// filter out select queries which do not modify data
if (!isSelectQuery) {
for (ReadEntity readEntity : event.getInputs()) {
processHiveEntity(dgiBridge, event, readEntity, source);
......
......@@ -64,6 +64,7 @@ public class HiveDataModelGenerator {
public static final String COMMENT = "comment";
public static final String PARAMETERS = "parameters";
public static final String COLUMNS = "columns";
public static final String PART_COLS = "partitionKeys";
public static final String STORAGE_NUM_BUCKETS = "numBuckets";
public static final String STORAGE_IS_STORED_AS_SUB_DIRS = "storedAsSubDirectories";
......@@ -244,7 +245,7 @@ public class HiveDataModelGenerator {
new AttributeDefinition("retention", DataTypes.INT_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(STORAGE_DESC, HiveDataTypes.HIVE_STORAGEDESC.getName(), Multiplicity.OPTIONAL, true,
null),
new AttributeDefinition("partitionKeys", DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
new AttributeDefinition(PART_COLS, DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, true, null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, true, null),
......
......@@ -19,6 +19,7 @@
package org.apache.atlas;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
......@@ -27,9 +28,15 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import org.apache.atlas.security.SecureClientUtils;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization$;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
......@@ -84,6 +91,7 @@ public class AtlasClient {
public static final String URI_ENTITY_AUDIT = "audit";
public static final String URI_SEARCH = "discovery/search";
public static final String URI_LINEAGE = "lineage/hive/table";
public static final String URI_TRAITS = "traits";
public static final String QUERY = "query";
public static final String QUERY_TYPE = "queryType";
......@@ -294,6 +302,10 @@ public class AtlasClient {
}
}
public WebResource getResource() {
return service;
}
/**
* Return status of the service instance the client is pointing to.
*
......@@ -425,6 +437,33 @@ public class AtlasClient {
}
/**
* Creates trait type with specifiedName, superTraits and attributes
* @param traitName the name of the trait type
* @param superTraits the list of super traits from which this trait type inherits attributes
* @param attributeDefinitions the list of attributes of the trait type
* @return the list of types created
* @throws AtlasServiceException
*/
public List<String> createTraitType(String traitName, ImmutableSet<String> superTraits, AttributeDefinition... attributeDefinitions) throws AtlasServiceException {
HierarchicalTypeDefinition<TraitType> piiTrait =
TypesUtil.createTraitTypeDef(traitName, superTraits, attributeDefinitions);
String traitDefinitionAsJSON = TypesSerialization.toJson(piiTrait, true);
LOG.debug("Creating trait type {} {}" , traitName, traitDefinitionAsJSON);
return createType(traitDefinitionAsJSON);
}
/**
* Creates simple trait type with specifiedName with no superTraits or attributes
* @param traitName the name of the trait type
* @return the list of types created
* @throws AtlasServiceException
*/
public List<String> createTraitType(String traitName) throws AtlasServiceException {
return createTraitType(traitName, null);
}
/**
* Register the given type(meta model)
* @param typeAsJson type definition a jaon
* @return result json object
......@@ -589,6 +628,18 @@ public class AtlasClient {
}
/**
* Associate trait to an entity
*
* @param guid guid
* @param traitDefinition trait definition
*/
public void addTrait(String guid, Struct traitDefinition) throws AtlasServiceException {
String traitJson = InstanceSerialization.toJson(traitDefinition, true);
LOG.debug("Adding trait to entity with id {} {}", guid, traitJson);
callAPI(API.ADD_TRAITS, traitJson, guid, URI_TRAITS);
}
/**
* Supports Partial updates
* Updates properties set in the definition for the entity corresponding to guid
* @param entityType Type of the entity being updated
......@@ -727,6 +778,17 @@ public class AtlasClient {
return extractResults(jsonResponse, AtlasClient.RESULTS, new ExtractOperation<String, String>());
}
/**
* List traits for a given entity identified by its GUID
* @param guid GUID of the entity
* @return List<String> - traitnames associated with entity
* @throws AtlasServiceException
*/
public List<String> listTraits(final String guid) throws AtlasServiceException {
JSONObject jsonResponse = callAPI(API.LIST_TRAITS, null, guid, URI_TRAITS);
return extractResults(jsonResponse, AtlasClient.RESULTS, new ExtractOperation<String, String>());
}
private class ExtractOperation<T, U> {
T extractElement(U element) throws JSONException {
return (T) element;
......
......@@ -17,6 +17,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-538 Rename table should retain traits/tags assigned to columns/storage descriptors (sumasai)
ATLAS-628 Starting two Atlas instances at the same time causes exceptions in HA mode (yhemanth via sumasai)
ATLAS-594 alter table rename doesnt work across databases (sumasai via shwethags)
ATLAS-586 While updating the multiple attributes, Atlas returns the response with escape characters (dkantor via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment