Commit 2e02ae62 by Shwetha GS

ATLAS-752 Column renames should retain traits/tags (svimal2106 via shwethags)

parent 70f71570
...@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path; ...@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Task;
...@@ -53,6 +54,7 @@ import org.apache.hadoop.security.UserGroupInformation; ...@@ -53,6 +54,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.json.JSONObject; import org.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.tools.cmd.gen.AnyVals;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -229,10 +231,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -229,10 +231,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
case ALTERTABLE_SERIALIZER: case ALTERTABLE_SERIALIZER:
case ALTERTABLE_ADDCOLS: case ALTERTABLE_ADDCOLS:
case ALTERTABLE_REPLACECOLS: case ALTERTABLE_REPLACECOLS:
case ALTERTABLE_RENAMECOL:
case ALTERTABLE_PARTCOLTYPE: case ALTERTABLE_PARTCOLTYPE:
handleEventOutputs(dgiBridge, event, Type.TABLE); handleEventOutputs(dgiBridge, event, Type.TABLE);
break; break;
case ALTERTABLE_RENAMECOL:
renameColumn(dgiBridge, event);
break;
case ALTERTABLE_LOCATION: case ALTERTABLE_LOCATION:
List<Pair<? extends Entity, Referenceable>> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE); List<Pair<? extends Entity, Referenceable>> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE);
if (tablesUpdated != null && tablesUpdated.size() > 0) { if (tablesUpdated != null && tablesUpdated.size() > 0) {
...@@ -297,6 +301,64 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -297,6 +301,64 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} }
} }
private Pair<String, String> findChangedColNames(List<FieldSchema> oldColList, List<FieldSchema> newColList){
HashMap<FieldSchema, Integer> oldColHashMap = new HashMap<>();
HashMap<FieldSchema, Integer> newColHashMap = new HashMap<>();
for (int i = 0; i < oldColList.size(); i++){
oldColHashMap.put(oldColList.get(i), i);
newColHashMap.put(newColList.get(i), i);
}
String changedColStringOldName = oldColList.get(0).getName();
String changedColStringNewName = changedColStringOldName;
for(int i = 0; i < oldColList.size(); i++){
if (!newColHashMap.containsKey(oldColList.get(i))){
changedColStringOldName = oldColList.get(i).getName();
break;
}
}
for(int i = 0; i < newColList.size(); i++){
if (!oldColHashMap.containsKey(newColList.get(i))){
changedColStringNewName = newColList.get(i).getName();
break;
}
}
return Pair.of(changedColStringOldName, changedColStringNewName);
}
private void renameColumn(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception{
assert event.getInputs() != null && event.getInputs().size() == 1;
assert event.getOutputs() != null && event.getOutputs().size() > 0;
Table oldTable = event.getInputs().iterator().next().getTable();
List<FieldSchema> oldColList = oldTable.getAllCols();
List<FieldSchema> newColList = dgiBridge.hiveClient.getTable(event.getOutputs().iterator().next().getTable().getTableName()).getAllCols();
assert oldColList.size() == newColList.size();
Pair<String, String> changedColNamePair = findChangedColNames(oldColList, newColList);
String oldColName = changedColNamePair.getLeft();
String newColName = changedColNamePair.getRight();
for(WriteEntity writeEntity : event.getOutputs()){
if (writeEntity.getType() == Type.TABLE){
Table newTable = writeEntity.getTable();
createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity, true, oldTable);
final String newQualifiedTableName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
newTable);
String oldColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newQualifiedTableName, oldColName);
String newColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newQualifiedTableName, newColName);
Referenceable newColEntity = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
newColEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newColumnQFName);
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
oldColumnQFName, newColEntity));
}
}
handleEventOutputs(dgiBridge, event, Type.TABLE);
}
private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception { private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
//crappy, no easy of getting new name //crappy, no easy of getting new name
assert event.getInputs() != null && event.getInputs().size() == 1; assert event.getInputs() != null && event.getInputs().size() == 1;
...@@ -389,7 +451,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -389,7 +451,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return newSDEntity; return newSDEntity;
} }
private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables) throws Exception { private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables, Table existTable) throws Exception {
Database db = null; Database db = null;
Table table = null; Table table = null;
Partition partition = null; Partition partition = null;
...@@ -419,13 +481,16 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -419,13 +481,16 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Referenceable tableEntity = null; Referenceable tableEntity = null;
if (table != null) { if (table != null) {
table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName()); if (existTable != null) {
table = existTable;
} else {
table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName());
}
//If its an external table, even though the temp table skip flag is on, //If its an external table, even though the temp table skip flag is on,
// we create the table since we need the HDFS path to temp table lineage. // we create the table since we need the HDFS path to temp table lineage.
if (skipTempTables && if (skipTempTables &&
table.isTemporary() && table.isTemporary() &&
!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { !TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
LOG.debug("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name()); LOG.debug("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
} else { } else {
...@@ -438,6 +503,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -438,6 +503,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return tableEntity; return tableEntity;
} }
private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables) throws Exception{
return createOrUpdateEntities(dgiBridge, user, entity, skipTempTables, null);
}
private List<Pair<? extends Entity, Referenceable>> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception { private List<Pair<? extends Entity, Referenceable>> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception {
List<Pair<? extends Entity, Referenceable>> entitiesCreatedOrUpdated = new ArrayList<>(); List<Pair<? extends Entity, Referenceable>> entitiesCreatedOrUpdated = new ArrayList<>();
for (Entity entity : event.getOutputs()) { for (Entity entity : event.getOutputs()) {
......
...@@ -68,6 +68,7 @@ import static org.apache.atlas.hive.hook.HiveHook.normalize; ...@@ -68,6 +68,7 @@ import static org.apache.atlas.hive.hook.HiveHook.normalize;
import static org.apache.atlas.hive.model.HiveDataModelGenerator.NAME; import static org.apache.atlas.hive.model.HiveDataModelGenerator.NAME;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
public class HiveHookIT { public class HiveHookIT {
...@@ -327,8 +328,8 @@ public class HiveHookIT { ...@@ -327,8 +328,8 @@ public class HiveHookIT {
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName); String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName);
JSONObject response = atlasClient.getInputGraph(datasetName); JSONObject response = atlasClient.getInputGraph(datasetName);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
Assert.assertTrue(vertices.has(viewId)); assertTrue(vertices.has(viewId));
Assert.assertTrue(vertices.has(table1Id)); assertTrue(vertices.has(table1Id));
//Alter the view from table2 //Alter the view from table2
String table2Name = createTable(); String table2Name = createTable();
...@@ -343,13 +344,13 @@ public class HiveHookIT { ...@@ -343,13 +344,13 @@ public class HiveHookIT {
datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName); datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName);
response = atlasClient.getInputGraph(datasetName); response = atlasClient.getInputGraph(datasetName);
vertices = response.getJSONObject("values").getJSONObject("vertices"); vertices = response.getJSONObject("values").getJSONObject("vertices");
Assert.assertTrue(vertices.has(viewId)); assertTrue(vertices.has(viewId));
//This is through the alter view process //This is through the alter view process
Assert.assertTrue(vertices.has(table2Id)); assertTrue(vertices.has(table2Id));
//This is through the Create view process //This is through the Create view process
Assert.assertTrue(vertices.has(table1Id)); assertTrue(vertices.has(table1Id));
//Outputs dont exist //Outputs dont exist
response = atlasClient.getOutputGraph(datasetName); response = atlasClient.getOutputGraph(datasetName);
...@@ -668,7 +669,7 @@ public class HiveHookIT { ...@@ -668,7 +669,7 @@ public class HiveHookIT {
public void assertOnEntity(final Referenceable entity) throws Exception { public void assertOnEntity(final Referenceable entity) throws Exception {
Referenceable sd = ((Referenceable) entity.get(HiveDataModelGenerator.STORAGE_DESC)); Referenceable sd = ((Referenceable) entity.get(HiveDataModelGenerator.STORAGE_DESC));
String location = (String) sd.get(HiveDataModelGenerator.LOCATION); String location = (String) sd.get(HiveDataModelGenerator.LOCATION);
Assert.assertTrue(location.contains(newTableName)); assertTrue(location.contains(newTableName));
} }
}); });
} }
...@@ -912,6 +913,42 @@ public class HiveHookIT { ...@@ -912,6 +913,42 @@ public class HiveHookIT {
} }
@Test @Test
public void testAlterTableWithoutHookConf() throws Exception {
HiveConf conf = new HiveConf();
conf.set("hive.exec.post.hooks", "");
SessionState ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
Driver driver = new Driver(conf);
String tableName = tableName();
String createCommand = "create table " + tableName + " (id int, name string)";
driver.run(createCommand);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
String command = "alter table " + tableName + " change id id_new string";
runCommand(command);
assertTableIsRegistered(DEFAULT_DB, tableName);
String tbqn = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id_new"));
}
@Test
public void testTraitsPreservedOnColumnRename() throws Exception {
String tableName = createTable();
String tbqn = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
String guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id"));
String trait = createTrait(guid);
String oldColName = "id";
String newColName = "id_new";
String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName);
runCommand(query);
String guid2 = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(tbqn, "id_new"));
assertEquals(guid2, guid);
assertTrue(atlasClient.getEntity(guid2).getTraits().contains(trait));
}
@Test
public void testAlterViewRename() throws Exception { public void testAlterViewRename() throws Exception {
String tableName = createTable(); String tableName = createTable();
String viewName = tableName(); String viewName = tableName();
...@@ -1490,14 +1527,14 @@ public class HiveHookIT { ...@@ -1490,14 +1527,14 @@ public class HiveHookIT {
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, db2, table2); String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, db2, table2);
JSONObject response = atlasClient.getInputGraph(datasetName); JSONObject response = atlasClient.getInputGraph(datasetName);
JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
Assert.assertTrue(vertices.has(table1Id)); assertTrue(vertices.has(table1Id));
Assert.assertTrue(vertices.has(table2Id)); assertTrue(vertices.has(table2Id));
datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, table1); datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, table1);
response = atlasClient.getOutputGraph(datasetName); response = atlasClient.getOutputGraph(datasetName);
vertices = response.getJSONObject("values").getJSONObject("vertices"); vertices = response.getJSONObject("values").getJSONObject("vertices");
Assert.assertTrue(vertices.has(table1Id)); assertTrue(vertices.has(table1Id));
Assert.assertTrue(vertices.has(table2Id)); assertTrue(vertices.has(table2Id));
} }
//For ATLAS-448 //For ATLAS-448
......
...@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-752 Column renames should retain traits/tags (svimal2106 via shwethags)
ATLAS-821 Atlas UI - Add arrow to navigate to child term (kevalbhatt18 via yhemanth) ATLAS-821 Atlas UI - Add arrow to navigate to child term (kevalbhatt18 via yhemanth)
ATLAS-812 Atlas UI - Associate Terms with Assets (kevalbhatt18 via yhemanth) ATLAS-812 Atlas UI - Associate Terms with Assets (kevalbhatt18 via yhemanth)
ATLAS-809 JAAS configuration needed for Kafka interaction via Atlas config file (abhayk via shwethags) ATLAS-809 JAAS configuration needed for Kafka interaction via Atlas config file (abhayk via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment