Commit 125dc976 by Shwetha GS

ATLAS-522 Support Alter table commands (sumasai via shwethags)

parent 98fdc6d8
......@@ -165,7 +165,7 @@ public class HiveMetaStoreBridge {
dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName);
dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription());
dbRef.set("locationUri", hiveDB.getLocationUri());
dbRef.set("parameters", hiveDB.getParameters());
dbRef.set(HiveDataModelGenerator.PARAMETERS, hiveDB.getParameters());
dbRef.set("ownerName", hiveDB.getOwnerName());
if (hiveDB.getOwnerType() != null) {
dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
......@@ -332,7 +332,7 @@ public class HiveMetaStoreBridge {
List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableQualifiedName);
tableReference.set("partitionKeys", partKeys);
tableReference.set("parameters", hiveTable.getParameters());
tableReference.set(HiveDataModelGenerator.PARAMETERS, hiveTable.getParameters());
if (hiveTable.getViewOriginalText() != null) {
tableReference.set("viewOriginalText", hiveTable.getViewOriginalText());
......@@ -481,7 +481,7 @@ public class HiveMetaStoreBridge {
// ones will fix to identify partitions with differing schema.
partRef.set("sd", sdReferenceable);
partRef.set("parameters", hivePart.getParameters());
partRef.set(HiveDataModelGenerator.PARAMETERS, hivePart.getParameters());
return partRef;
}
......@@ -518,7 +518,7 @@ public class HiveMetaStoreBridge {
serdeInfoStruct.set(HiveDataModelGenerator.NAME, serdeInfo.getName());
serdeInfoStruct.set("serializationLib", serdeInfo.getSerializationLib());
serdeInfoStruct.set("parameters", serdeInfo.getParameters());
serdeInfoStruct.set(HiveDataModelGenerator.PARAMETERS, serdeInfo.getParameters());
sdReferenceable.set("serdeInfo", serdeInfoStruct);
sdReferenceable.set(HiveDataModelGenerator.STORAGE_NUM_BUCKETS, storageDesc.getNumBuckets());
......@@ -547,7 +547,7 @@ public class HiveMetaStoreBridge {
sdReferenceable.set("bucketCols", storageDesc.getBucketCols());
}
sdReferenceable.set("parameters", storageDesc.getParameters());
sdReferenceable.set(HiveDataModelGenerator.PARAMETERS, storageDesc.getParameters());
sdReferenceable.set("storedAsSubDirectories", storageDesc.isStoredAsSubDirectories());
return sdReferenceable;
......
......@@ -224,6 +224,16 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
renameTable(dgiBridge, event);
break;
case ALTERTABLE_FILEFORMAT:
case ALTERTABLE_LOCATION:
case ALTERTABLE_CLUSTER_SORT:
case ALTERTABLE_BUCKETNUM:
case ALTERTABLE_PROPERTIES:
case ALTERTABLE_SERDEPROPERTIES:
case ALTERTABLE_SERIALIZER:
alterTable(dgiBridge, event);
break;
case ALTERVIEW_AS:
//update inputs/outputs?
break;
......@@ -231,7 +241,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
case ALTERTABLE_ADDCOLS:
case ALTERTABLE_REPLACECOLS:
case ALTERTABLE_RENAMECOL:
alterTableColumns(dgiBridge, event);
alterTable(dgiBridge, event);
break;
default:
......@@ -240,21 +250,16 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
notifyEntities(messages);
}
private void alterTableColumns(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
private void alterTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
assert event.inputs != null && event.inputs.size() == 1;
assert event.outputs != null && event.outputs.size() > 0;
for (WriteEntity writeEntity : event.outputs) {
if (writeEntity.getType() == Entity.Type.TABLE) {
Table newTable = writeEntity.getTable();
//Reload table since hive is not providing the updated column set here
Table updatedTable = dgiBridge.hiveClient.getTable(newTable.getDbName(), newTable.getTableName());
writeEntity.setT(updatedTable);
//Create/update table entity
createOrUpdateEntities(dgiBridge, writeEntity);
}
//Below check should filter out partition related
if (writeEntity.getType() == Entity.Type.TABLE) {
//Create/update table entity
createOrUpdateEntities(dgiBridge, writeEntity);
}
}
}
......@@ -280,7 +285,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
tableEntity.set(HiveDataModelGenerator.TABLE_NAME, oldTable.getTableName().toLowerCase());
String newQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
newTable.getDbName(), newTable.getTableName());
......@@ -415,4 +419,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return new JSONObject();
}
}
}
......@@ -64,6 +64,7 @@ public class HiveDataModelGenerator {
private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
public static final String COMMENT = "comment";
public static final String PARAMETERS = "parameters";
public static final String COLUMNS = "columns";
public static final String STORAGE_NUM_BUCKETS = "numBuckets";
......@@ -74,7 +75,10 @@ public class HiveDataModelGenerator {
public static final String CLUSTER_NAME = "clusterName";
public static final String TABLE = "table";
public static final String DB = "db";
public static final String STORAGE_DESC = "sd";
public static final String STORAGE_DESC_INPUT_FMT = "inputFormat";
public static final String STORAGE_DESC_OUTPUT_FMT = "outputFormat";
public HiveDataModelGenerator() {
classTypeDefinitions = new HashMap<>();
......@@ -164,7 +168,7 @@ public class HiveDataModelGenerator {
new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("serializationLib", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL,
false, null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),};
new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),};
StructTypeDefinition definition =
new StructTypeDefinition(HiveDataTypes.HIVE_SERDE.getName(), attributeDefinitions);
structTypeDefinitionMap.put(HiveDataTypes.HIVE_SERDE.getName(), definition);
......@@ -235,7 +239,7 @@ public class HiveDataModelGenerator {
null),
new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("ownerName", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("ownerType", HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(), Multiplicity.OPTIONAL,
......@@ -288,7 +292,7 @@ public class HiveDataModelGenerator {
null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, true, null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),};
new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_PARTITION.getName(), null,
ImmutableList.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), attributeDefinitions);
......@@ -314,7 +318,7 @@ public class HiveDataModelGenerator {
Multiplicity.OPTIONAL, true, null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, true, null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("viewOriginalText", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL,
false, null),
new AttributeDefinition("viewExpandedText", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL,
......
......@@ -18,8 +18,11 @@
package org.apache.atlas.hive.hook;
import com.google.common.collect.ImmutableList;
import groovy.transform.Immutable;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
......@@ -35,6 +38,13 @@ import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
......@@ -42,7 +52,15 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -62,6 +80,10 @@ public class HiveHookIT {
public void setUp() throws Exception {
//Set-up hive session
HiveConf conf = new HiveConf();
//Run in local mode
conf.set("mapreduce.framework.name", "local");
conf.set("fs.default.name", "file:///'");
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
driver = new Driver(conf);
ss = new SessionState(conf, System.getProperty("user.name"));
ss = SessionState.start(ss);
......@@ -87,7 +109,7 @@ public class HiveHookIT {
String dbId = assertDatabaseIsRegistered(dbName);
Referenceable definition = dgiCLient.getEntity(dbId);
Map params = (Map) definition.get("parameters");
Map params = (Map) definition.get(HiveDataModelGenerator.PARAMETERS);
Assert.assertNotNull(params);
Assert.assertEquals(params.size(), 2);
Assert.assertEquals(params.get("p1"), "v1");
......@@ -380,6 +402,184 @@ public class HiveHookIT {
assertTableIsNotRegistered(DEFAULT_DB, viewName);
}
@Test
public void testAlterTableLocation() throws Exception {
String tableName = createTable();
final String testPath = "file://" + System.getProperty("java.io.tmpdir", "/tmp") + File.pathSeparator + "testPath";
String query = "alter table " + tableName + " set location '" + testPath + "'";
runCommand(query);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
//Verify the number of columns present in the table
Referenceable tableRef = dgiCLient.getEntity(tableId);
Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
Assert.assertEquals(sdRef.get("location"), testPath);
}
@Test
public void testAlterTableFileFormat() throws Exception {
String tableName = createTable();
final String testFormat = "orc";
String query = "alter table " + tableName + " set FILEFORMAT " + testFormat;
runCommand(query);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
Referenceable tableRef = dgiCLient.getEntity(tableId);
Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_INPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_OUTPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
Assert.assertNotNull(sdRef.get("serdeInfo"));
Struct serdeInfo = (Struct) sdRef.get("serdeInfo");
Assert.assertEquals(serdeInfo.get("serializationLib"), "org.apache.hadoop.hive.ql.io.orc.OrcSerde");
Assert.assertNotNull(serdeInfo.get(HiveDataModelGenerator.PARAMETERS));
Assert.assertEquals(((Map<String, String>) serdeInfo.get(HiveDataModelGenerator.PARAMETERS)).get("serialization.format"), "1");
/**
* Hive 'alter table stored as' is not supported - See https://issues.apache.org/jira/browse/HIVE-9576
* query = "alter table " + tableName + " STORED AS " + testFormat.toUpperCase();
* runCommand(query);
* tableRef = dgiCLient.getEntity(tableId);
* sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
* Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_INPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
* Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_OUTPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
* Assert.assertEquals(((Map) sdRef.get(HiveDataModelGenerator.PARAMETERS)).get("orc.compress"), "ZLIB");
*/
}
@Test
public void testAlterTableBucketingClusterSort() throws Exception {
String tableName = createTable();
ImmutableList<String> cols = ImmutableList.<String>of("id");
runBucketSortQuery(tableName, 5, cols, cols);
cols = ImmutableList.<String>of("id", "name");
runBucketSortQuery(tableName, 2, cols, cols);
}
private void runBucketSortQuery(String tableName, int numBuckets, ImmutableList<String> bucketCols,ImmutableList<String> sortCols) throws Exception {
final String fmtQuery = "alter table %s CLUSTERED BY (%s) SORTED BY (%s) INTO %s BUCKETS";
String query = String.format(fmtQuery, tableName, stripListBrackets(bucketCols.toString()), stripListBrackets(sortCols.toString()), numBuckets);
runCommand(query);
verifyBucketSortingProperties(tableName, numBuckets, bucketCols, sortCols);
}
private String stripListBrackets(String listElements) {
return StringUtils.strip(StringUtils.strip(listElements, "["), "]");
}
private void verifyBucketSortingProperties(String tableName, int numBuckets, ImmutableList<String> bucketColNames, ImmutableList<String> sortcolNames) throws Exception {
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
Referenceable tableRef = dgiCLient.getEntity(tableId);
Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
Assert.assertEquals(((scala.math.BigInt) sdRef.get(HiveDataModelGenerator.STORAGE_NUM_BUCKETS)).intValue(), numBuckets);
Assert.assertEquals(sdRef.get("bucketCols"), bucketColNames);
List<Struct> hiveOrderStructList = (List<Struct>) sdRef.get("sortCols");
Assert.assertNotNull(hiveOrderStructList);
Assert.assertEquals(hiveOrderStructList.size(), sortcolNames.size());
for (int i = 0; i < sortcolNames.size(); i++) {
Assert.assertEquals(hiveOrderStructList.get(i).get("col"), sortcolNames.get(i));
Assert.assertEquals(((scala.math.BigInt)hiveOrderStructList.get(i).get("order")).intValue(), 1);
}
}
@Test
public void testAlterTableSerde() throws Exception {
//SERDE PROPERTIES
String tableName = createTable();
Map<String, String> expectedProps = new HashMap<String, String>() {{
put("key1", "value1");
}};
runSerdePropsQuery(tableName, expectedProps);
expectedProps.put("key2", "value2");
//Add another property
runSerdePropsQuery(tableName, expectedProps);
}
private void runSerdePropsQuery(String tableName, Map<String, String> expectedProps) throws Exception {
final String serdeLib = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
final String serializedProps = getSerializedProps(expectedProps);
String query = String.format("alter table %s set SERDE '%s' WITH SERDEPROPERTIES (%s)", tableName, serdeLib, serializedProps);
runCommand(query);
verifyTableSdProperties(tableName, serdeLib, expectedProps);
}
private String getSerializedProps(Map<String, String> expectedProps) {
StringBuffer sb = new StringBuffer();
for(String expectedPropKey : expectedProps.keySet()) {
if(sb.length() > 0) {
sb.append(",");
}
sb.append("'").append(expectedPropKey).append("'");
sb.append("=");
sb.append("'").append(expectedProps.get(expectedPropKey)).append("'");
}
return sb.toString();
}
@Test
public void testAlterTableProperties() throws Exception {
String tableName = createTable();
final Map<String, String> expectedProps = new HashMap<String, String>() {{
put("testPropKey1", "testPropValue1");
put("comment", "test comment");
}};
final String fmtQuery = "alter table %s set TBLPROPERTIES (%s)";
String query = String.format(fmtQuery, tableName, getSerializedProps(expectedProps));
runCommand(query);
verifyTableProperties(tableName, expectedProps);
expectedProps.put("testPropKey2", "testPropValue2");
//Add another property
query = String.format(fmtQuery, tableName, getSerializedProps(expectedProps));
runCommand(query);
verifyTableProperties(tableName, expectedProps);
}
private void verifyTableProperties(String tableName, Map<String, String> expectedProps) throws Exception {
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
Referenceable tableRef = dgiCLient.getEntity(tableId);
Map<String, String> parameters = (Map<String, String>) tableRef.get(HiveDataModelGenerator.PARAMETERS);
Assert.assertNotNull(parameters);
//Comment should exist since SET TBLPOPERTIES only adds properties. Doe not remove existing ones
for (String propKey : expectedProps.keySet()) {
Assert.assertEquals(parameters.get(propKey), expectedProps.get(propKey));
}
}
private void verifyTableSdProperties(String tableName, String serdeLib, Map<String, String> expectedProps) throws Exception {
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
Referenceable tableRef = dgiCLient.getEntity(tableId);
Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
Struct serdeInfo = (Struct) sdRef.get("serdeInfo");
Assert.assertEquals(serdeInfo.get("serializationLib"), serdeLib);
Map<String, String> parameters = (Map<String, String>) serdeInfo.get(HiveDataModelGenerator.PARAMETERS);
Assert.assertNotNull(parameters);
//Comment should exist since SET TBLPOPERTIES only adds properties. Doe not remove existing ones
for (String propKey : expectedProps.keySet()) {
Assert.assertEquals(parameters.get(propKey), expectedProps.get(propKey));
}
}
private String assertProcessIsRegistered(String queryStr) throws Exception {
// String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(),
// normalize(queryStr));
......@@ -442,7 +642,7 @@ public class HiveHookIT {
}
private String assertEntityIsRegistered(final String query, String... arg) throws Exception {
waitFor(2000, new Predicate() {
waitFor(60000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = dgiCLient.search(query);
......
......@@ -11,6 +11,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-522 Support Alter table commands (sumasai via shwethags)
ATLAS-512 Decouple currently integrating components from availability of Atlas service for raising metadata events ( yhemanth via sumasai)
ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table ( shwethags via sumasai)
ATLAS-476 Update type attribute with Reserved characters updated the original type as unknown (yhemanth via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment