Commit 0defc6e8 by Shwetha GS

ATLAS-521 Support Alter Table column commands (suma.shivaprasad via shwethags)

parent 5b748aa4
......@@ -62,6 +62,7 @@ public class HiveMetaStoreBridge {
public static final String TABLE_TYPE_ATTR = "tableType";
public static final String SEARCH_ENTRY_GUID_ATTR = "__guid";
public static final String LAST_ACCESS_TIME_ATTR = "lastAccessTime";
private final String clusterName;
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
......@@ -321,9 +322,9 @@ public class HiveMetaStoreBridge {
// add reference to the database
tableReference.set(HiveDataModelGenerator.DB, dbReference);
tableReference.set("columns", getColumns(hiveTable.getCols(), tableQualifiedName));
tableReference.set(HiveDataModelGenerator.COLUMNS, getColumns(hiveTable.getCols(), tableQualifiedName));
// add reference to the StorageDescriptor
// add reference to the StorageDescriptorx
Referenceable sdReferenceable = fillStorageDescStruct(hiveTable.getSd(), tableQualifiedName, tableQualifiedName);
tableReference.set("sd", sdReferenceable);
......@@ -501,7 +502,7 @@ public class HiveMetaStoreBridge {
partition.getTable().getTableName(), StringUtils.join(partition.getValues(), "-"), clusterName);
}
private Referenceable fillStorageDescStruct(StorageDescriptor storageDesc, String tableQualifiedName,
public Referenceable fillStorageDescStruct(StorageDescriptor storageDesc, String tableQualifiedName,
String sdQualifiedName) throws Exception {
LOG.debug("Filling storage descriptor information for " + storageDesc);
......@@ -524,12 +525,6 @@ public class HiveMetaStoreBridge {
sdReferenceable
.set(HiveDataModelGenerator.STORAGE_IS_STORED_AS_SUB_DIRS, storageDesc.isStoredAsSubDirectories());
//Use the passed column list if not null, ex: use same references for table and SD
List<FieldSchema> columns = storageDesc.getCols();
if (columns != null && !columns.isEmpty()) {
sdReferenceable.set("cols", getColumns(columns, tableQualifiedName));
}
List<Struct> sortColsStruct = new ArrayList<>();
for (Order sortcol : storageDesc.getSortCols()) {
String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName();
......@@ -558,13 +553,14 @@ public class HiveMetaStoreBridge {
return sdReferenceable;
}
private String getColumnQualifiedName(String tableQualifiedName, String colName) {
String[] parts = tableQualifiedName.split("@");
String tableName = parts[0];
public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
final String[] parts = tableQualifiedName.split("@");
final String tableName = parts[0];
final String clusterName = parts[1];
return String.format("%s.%s@%s", tableName, colName, clusterName);
}
private List<Referenceable> getColumns(List<FieldSchema> schemaList, String tableQualifiedName) throws Exception {
public List<Referenceable> getColumns(List<FieldSchema> schemaList, String tableQualifiedName) throws Exception {
List<Referenceable> colList = new ArrayList<>();
for (FieldSchema fs : schemaList) {
LOG.debug("Processing field " + fs);
......
......@@ -81,7 +81,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final int WAIT_TIME = 3;
private static ExecutorService executor;
private static final int minThreadsDefault = 5;
private static final int minThreadsDefault = 1;
private static final int maxThreadsDefault = 5;
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
......@@ -236,6 +236,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
case ALTERTABLE_ADDCOLS:
case ALTERTABLE_REPLACECOLS:
case ALTERTABLE_RENAMECOL:
alterTableColumns(dgiBridge, event);
break;
default:
......@@ -244,6 +245,24 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
notifyEntities(messages);
}
private void alterTableColumns(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
assert event.inputs != null && event.inputs.size() == 1;
assert event.outputs != null && event.outputs.size() > 0;
for (WriteEntity writeEntity : event.outputs) {
if (writeEntity.getType() == Entity.Type.TABLE) {
Table newTable = writeEntity.getTable();
//Reload table since hive is not providing the updated column set here
Table updatedTable = dgiBridge.hiveClient.getTable(newTable.getDbName(), newTable.getTableName());
writeEntity.setT(updatedTable);
//Create/update table entity
createOrUpdateEntities(dgiBridge, writeEntity);
}
}
}
private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
//crappy, no easy of getting new name
assert event.inputs != null && event.inputs.size() == 1;
......@@ -260,7 +279,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
.equals(oldTable.getTableName())) {
//Create/update old table entity - create new entity and replace id
Referenceable tableEntity = createEntities(dgiBridge, writeEntity);
Referenceable tableEntity = createOrUpdateEntities(dgiBridge, writeEntity);
String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
oldTable.getDbName(), oldTable.getTableName());
tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
......@@ -280,7 +299,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
}
private Referenceable createEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception {
private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception {
Database db = null;
Table table = null;
Partition partition = null;
......@@ -327,7 +346,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception {
for (WriteEntity entity : event.outputs) {
if (entity.getType() == entityType) {
createEntities(dgiBridge, entity);
createOrUpdateEntities(dgiBridge, entity);
}
}
}
......@@ -365,7 +384,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
Referenceable inTable = createEntities(dgiBridge, readEntity);
Referenceable inTable = createOrUpdateEntities(dgiBridge, readEntity);
source.add(inTable);
}
}
......@@ -374,7 +393,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
Referenceable outTable = createEntities(dgiBridge, writeEntity);
Referenceable outTable = createOrUpdateEntities(dgiBridge, writeEntity);
target.add(outTable);
}
}
......
......@@ -58,6 +58,7 @@ public class HiveDataModelGenerator {
private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
public static final String COMMENT = "comment";
public static final String COLUMNS = "columns";
public static final String STORAGE_NUM_BUCKETS = "numBuckets";
public static final String STORAGE_IS_STORED_AS_SUB_DIRS = "storedAsSubDirectories";
......@@ -67,6 +68,7 @@ public class HiveDataModelGenerator {
public static final String CLUSTER_NAME = "clusterName";
public static final String TABLE = "table";
public static final String DB = "db";
public static final String STORAGE_DESC = "sd";
public HiveDataModelGenerator() {
classTypeDefinitions = new HashMap<>();
......@@ -176,8 +178,6 @@ public class HiveDataModelGenerator {
private void createStorageDescClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("cols", String.format("array<%s>", HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, true, null),
new AttributeDefinition("location", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("inputFormat", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
......@@ -278,7 +278,7 @@ public class HiveDataModelGenerator {
null),
new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(), Multiplicity.REQUIRED, true,
new AttributeDefinition(STORAGE_DESC, HiveDataTypes.HIVE_STORAGEDESC.getName(), Multiplicity.REQUIRED, true,
null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, true, null),
......@@ -302,7 +302,7 @@ public class HiveDataModelGenerator {
null),
new AttributeDefinition(COMMENT, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("retention", DataTypes.INT_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("sd", HiveDataTypes.HIVE_STORAGEDESC.getName(), Multiplicity.OPTIONAL, true,
new AttributeDefinition(STORAGE_DESC, HiveDataTypes.HIVE_STORAGEDESC.getName(), Multiplicity.OPTIONAL, true,
null),
new AttributeDefinition("partitionKeys", DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, true, null),
......
......@@ -24,6 +24,7 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
......@@ -42,6 +43,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
......@@ -113,6 +115,10 @@ public class HiveHookIT {
return "table" + random();
}
private String columnName() {
return "col" + random();
}
private String createTable() throws Exception {
return createTable(true);
}
......@@ -128,12 +134,12 @@ public class HiveHookIT {
public void testCreateTable() throws Exception {
String tableName = tableName();
String dbName = createDatabase();
String colName = "col" + random();
String colName = columnName();
runCommand("create table " + dbName + "." + tableName + "(" + colName + " int, name string)");
assertTableIsRegistered(dbName, tableName);
//there is only one instance of column registered
String colId = assertColumnIsRegistered(colName);
String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName));
Referenceable colEntity = dgiCLient.getEntity(colId);
Assert.assertEquals(colEntity.get("qualifiedName"), String.format("%s.%s.%s@%s", dbName.toLowerCase(),
tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME));
......@@ -145,7 +151,7 @@ public class HiveHookIT {
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment");
String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), entityName);
Assert.assertEquals(tableRef.get("name"), "default." + tableName.toLowerCase() + "@" + CLUSTER_NAME);
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), "default." + tableName.toLowerCase() + "@" + CLUSTER_NAME);
final Referenceable sdRef = (Referenceable) tableRef.get("sd");
Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_IS_STORED_AS_SUB_DIRS), false);
......@@ -155,12 +161,19 @@ public class HiveHookIT {
}
private String assertColumnIsRegistered(String colName) throws Exception {
LOG.debug("Searching for column {}", colName);
LOG.debug("Searching for column {}", colName.toLowerCase());
String query =
String.format("%s where name = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase());
String.format("%s where qualifiedName = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase());
return assertEntityIsRegistered(query);
}
private void assertColumnIsNotRegistered(String colName) throws Exception {
LOG.debug("Searching for column {}", colName);
String query =
String.format("%s where qualifiedName = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase());
assertEntityIsNotRegistered(query);
}
@Test
public void testCTAS() throws Exception {
String tableName = createTable();
......@@ -270,6 +283,86 @@ public class HiveHookIT {
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
private List<Referenceable> getColumns(String dbName, String tableName) throws Exception {
String tableId = assertTableIsRegistered(dbName, tableName);
Referenceable tableRef = dgiCLient.getEntity(tableId);
return ((List<Referenceable>)tableRef.get(HiveDataModelGenerator.COLUMNS));
}
@Test
public void testAlterTableAddColumn() throws Exception {
String tableName = createTable();
String column = columnName();
String query = "alter table " + tableName + " add columns (" + column + " string)";
runCommand(query);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), column));
//Verify the number of columns present in the table
final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 3);
}
@Test
public void testAlterTableDropColumn() throws Exception {
String tableName = createTable();
final String colDropped = "name";
String query = "alter table " + tableName + " replace columns (id int)";
runCommand(query);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), colDropped));
//Verify the number of columns present in the table
final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 1);
}
@Test
public void testAlterTableChangeColumn() throws Exception {
//Change name
String oldColName = "name";
String newColName = "name1";
String tableName = createTable();
String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName);
runCommand(query);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName));
//Verify the number of columns present in the table
List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
//Change column type
oldColName = "name1";
newColName = "name2";
final String newColType = "int";
query = String.format("alter table %s change column %s %s %s", tableName, oldColName, newColName, newColType);
runCommand(query);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName);
Assert.assertEquals(columns.get(1).get("type"), "int");
//Change name and add comment
oldColName = "name2";
newColName = "name3";
final String comment = "added comment";
query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, newColName, newColType, comment);
runCommand(query);
columns = getColumns(DEFAULT_DB, tableName);
Assert.assertEquals(columns.size(), 2);
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
assertColumnIsRegistered(newColQualifiedName);
Assert.assertEquals(columns.get(1).get(HiveDataModelGenerator.COMMENT), comment);
}
@Test
public void testAlterViewRename() throws Exception {
String tableName = createTable();
......@@ -320,6 +413,14 @@ public class HiveHookIT {
return assertEntityIsRegistered(query, "t");
}
private String getTableEntity(String dbName, String tableName) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String query = String.format(
"%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
return assertEntityIsRegistered(query, "t");
}
private String assertDatabaseIsRegistered(String dbName) throws Exception {
LOG.debug("Searching for database {}", dbName);
String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
......
......@@ -3,6 +3,7 @@ Apache Atlas Release Notes
--trunk - unreleased
INCOMPATIBLE CHANGES:
ATLAS-521 Support Alter Table column commands (suma.shivaprasad via shwethags)
ATLAS-500 UI: Search Default (sanjayp via shwethags)
ATLAS-483 Remove client.properties (tbeerbower via shwethags)
ATLAS-349 SSL - Atlas SSL connection has weak/unsafe Ciphers suites (ndjouhr via shwethags)
......
......@@ -110,8 +110,6 @@ public class BaseHiveRepositoryTest {
HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
.createClassTypeDef(STORAGE_DESC_TYPE, null,
new AttributeDefinition("cols", String.format("array<%s>", COLUMN_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("location", DataTypes.STRING_TYPE),
attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
......
......@@ -231,8 +231,6 @@ public final class TestUtils {
new AttributeDefinition[]{createRequiredAttrDef("name", DataTypes.STRING_TYPE),});
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("cols", String.format("array<%s>", COLUMN_TYPE),
Multiplicity.OPTIONAL, true, null),
new AttributeDefinition("location", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("inputFormat", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
......
......@@ -192,8 +192,8 @@ public class GraphBackedDiscoveryServiceTest extends BaseHiveRepositoryTest {
{"hive_column where hive_column isa PII", 6},
{"View is Dimension" , 2},
// {"hive_column where hive_column isa PII select hive_column.name", 6}, //Not working - ATLAS-175
{"hive_column select hive_column.name", 37},
{"hive_column select name", 37},
{"hive_column select hive_column.name", 29},
{"hive_column select name", 29},
{"hive_column where hive_column.name=\"customer_id\"", 4},
{"from hive_table select hive_table.name", 8},
{"hive_db where (name = \"Reporting\")", 1},
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment