Commit 8fefd165 by Suma Shivaprasad

ATLAS-884 Process registration should call Entity update instead of create (sumasai)

parent e0536224
......@@ -62,6 +62,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -203,9 +204,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
break;
case CREATETABLE:
List<Pair<? extends Entity, Referenceable>> tablesCreated = handleEventOutputs(dgiBridge, event, Type.TABLE);
LinkedHashMap<Type, Referenceable> tablesCreated = handleEventOutputs(dgiBridge, event, Type.TABLE);
if (tablesCreated.size() > 0) {
handleExternalTables(dgiBridge, event, tablesCreated.get(0).getLeft(), tablesCreated.get(0).getRight());
handleExternalTables(dgiBridge, event, tablesCreated);
}
break;
......@@ -242,10 +243,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
renameColumn(dgiBridge, event);
break;
case ALTERTABLE_LOCATION:
List<Pair<? extends Entity, Referenceable>> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE);
LinkedHashMap<Type, Referenceable> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE);
if (tablesUpdated != null && tablesUpdated.size() > 0) {
//Track altered lineage in case of external tables
handleExternalTables(dgiBridge, event, tablesUpdated.get(0).getLeft(), tablesUpdated.get(0).getRight());
handleExternalTables(dgiBridge, event, tablesUpdated);
}
break;
case ALTERDATABASE:
......@@ -384,7 +385,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//Create/update old table entity - create entity with oldQFNme and old tableName if it doesnt exist. If exists, will update
//We always use the new entity while creating the table since some flags, attributes of the table are not set in inputEntity and Hive.getTable(oldTableName) also fails since the table doesnt exist in hive anymore
final Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity, true);
final LinkedHashMap<Type, Referenceable> tables = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity, true);
Referenceable tableEntity = tables.get(Type.TABLE);
//Reset regular column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveDataModelGenerator.COLUMNS), oldQualifiedName, newQualifiedName);
......@@ -458,10 +460,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return newSDEntity;
}
private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables, Table existTable) throws Exception {
private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables, Table existTable) throws Exception {
Database db = null;
Table table = null;
Partition partition = null;
LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>();
List<Referenceable> entities = new ArrayList<>();
switch (entity.getType()) {
......@@ -483,7 +486,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
db = dgiBridge.hiveClient.getDatabase(db.getName());
Referenceable dbEntity = dgiBridge.createDBInstance(db);
entities.add(dbEntity);
result.put(Type.DATABASE, dbEntity);
Referenceable tableEntity = null;
......@@ -503,30 +508,38 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} else {
tableEntity = dgiBridge.createTableInstance(dbEntity, table);
entities.add(tableEntity);
result.put(Type.TABLE, tableEntity);
}
}
messages.add(new HookNotification.EntityUpdateRequest(user, entities));
return tableEntity;
return result;
}
private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables) throws Exception{
private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables) throws Exception{
return createOrUpdateEntities(dgiBridge, user, entity, skipTempTables, null);
}
private List<Pair<? extends Entity, Referenceable>> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception {
List<Pair<? extends Entity, Referenceable>> entitiesCreatedOrUpdated = new ArrayList<>();
private LinkedHashMap<Type, Referenceable> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception {
for (Entity entity : event.getOutputs()) {
if (entity.getType() == entityType) {
Referenceable entityCreatedOrUpdated = createOrUpdateEntities(dgiBridge, event.getUser(), entity, true);
if (entitiesCreatedOrUpdated != null) {
entitiesCreatedOrUpdated.add(Pair.of(entity, entityCreatedOrUpdated));
}
return createOrUpdateEntities(dgiBridge, event.getUser(), entity, true);
}
}
return null;
}
private Entity getEntityByType(Set<? extends Entity> entities, Type entityType) {
for (Entity entity : entities) {
if (entity.getType() == entityType) {
return entity;
}
}
return entitiesCreatedOrUpdated;
return null;
}
public static String lower(String str) {
if (StringUtils.isEmpty(str)) {
return null;
......@@ -565,17 +578,18 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
final Map<String, Referenceable> source = new LinkedHashMap<>();
final Map<String, Referenceable> target = new LinkedHashMap<>();
final Set<Referenceable> entities = new LinkedHashSet<>();
boolean isSelectQuery = isSelectQuery(event);
// filter out select queries which do not modify data
if (!isSelectQuery) {
for (ReadEntity readEntity : event.getInputs()) {
processHiveEntity(dgiBridge, event, readEntity, source);
processHiveEntity(dgiBridge, event, readEntity, source, entities);
}
for (WriteEntity writeEntity : event.getOutputs()) {
processHiveEntity(dgiBridge, event, writeEntity, target);
processHiveEntity(dgiBridge, event, writeEntity, target, entities);
}
if (source.size() > 0 || target.size() > 0) {
......@@ -586,7 +600,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
new ArrayList<Referenceable>() {{
addAll(target.values());
}});
messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
entities.add(processReferenceable);
messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<Referenceable>(entities)));
} else {
LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr());
}
......@@ -595,18 +611,20 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
}
private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets) throws Exception {
private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets, Set<Referenceable> entities) throws Exception {
if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) {
final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
if (!dataSets.containsKey(tblQFName)) {
Referenceable inTable = createOrUpdateEntities(dgiBridge, event.getUser(), entity, false);
dataSets.put(tblQFName, inTable);
LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event.getUser(), entity, false);
dataSets.put(tblQFName, result.get(Type.TABLE));
entities.addAll(result.values());
}
} else if (entity.getType() == Type.DFS_DIR) {
final String pathUri = lower(new Path(entity.getLocation()).toString());
LOG.info("Registering DFS Path {} ", pathUri);
Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
dataSets.put(pathUri, hdfsPath);
entities.add(hdfsPath);
}
}
......@@ -642,8 +660,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return false;
}
private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final Entity entity, final Referenceable tblRef) throws HiveException, MalformedURLException {
Table hiveTable = entity.getTable();
private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) throws HiveException, MalformedURLException {
List<Referenceable> entities = new ArrayList<>();
Table hiveTable = getEntityByType(event.getOutputs(), Type.TABLE).getTable();
//Refresh to get the correct location
hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
......@@ -655,11 +674,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}};
List<Referenceable> outputs = new ArrayList<Referenceable>() {{
add(tblRef);
add(tables.get(Type.TABLE));
}};
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs);
messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
entities.addAll(tables.values());
entities.add(processReferenceable);
messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
}
}
......
......@@ -248,7 +248,7 @@ public class HiveHookIT {
verifyTimestamps(processReference, "startTime");
verifyTimestamps(processReference, "endTime");
validateHDFSPaths(processReference, pFile, INPUTS);
validateHDFSPaths(processReference, INPUTS, pFile);
}
private void validateOutputTables(Referenceable processReference, String... expectedTableNames) throws Exception {
......@@ -301,6 +301,35 @@ public class HiveHookIT {
}
@Test
public void testDropAndRecreateCTASOutput() throws Exception {
String tableName = createTable();
String ctasTableName = "table" + random();
String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
String processId = assertProcessIsRegistered(query);
final String drpquery = String.format("drop table %s ", ctasTableName);
runCommand(drpquery);
assertTableIsNotRegistered(DEFAULT_DB, ctasTableName);
//Fix after ATLAS-876
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
String process2Id = assertProcessIsRegistered(query);
Assert.assertEquals(process2Id, processId);
Referenceable processRef = atlasClient.getEntity(processId);
String tblQlfdname = getQualifiedTblName(tableName);
String ctasQlfdname = getQualifiedTblName(ctasTableName);
validateInputTables(processRef, tblQlfdname);
validateOutputTables(processRef, ctasQlfdname, ctasQlfdname);
}
@Test
public void testCreateView() throws Exception {
String tableName = createTable();
String viewName = tableName();
......@@ -402,7 +431,7 @@ public class HiveHookIT {
final String tblQlfdName = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName);
validateHDFSPaths(processReference, loadFile, INPUTS);
validateHDFSPaths(processReference, INPUTS, loadFile);
validateOutputTables(processReference, tblQlfdName);
}
......@@ -416,8 +445,8 @@ public class HiveHookIT {
return inputtblQlfdName;
}
private Referenceable validateProcess(String query, String inputTable, String outputTable) throws Exception {
String processId = assertProcessIsRegistered(query, inputTable, outputTable);
private Referenceable validateProcess(String query, String inputTable, String... outputTables) throws Exception {
String processId = assertProcessIsRegistered(query, inputTable, outputTables);
Referenceable process = atlasClient.getEntity(processId);
if (inputTable == null) {
Assert.assertNull(process.get(INPUTS));
......@@ -426,11 +455,11 @@ public class HiveHookIT {
validateInputTables(process, inputTable);
}
if (outputTable == null) {
if (outputTables == null) {
Assert.assertNull(process.get(OUTPUTS));
} else {
Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1 );
validateOutputTables(process, outputTable);
Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1);
validateOutputTables(process, outputTables);
}
return process;
......@@ -472,6 +501,43 @@ public class HiveHookIT {
}
@Test
public void testUpdateProcess() throws Exception {
String tableName = createTable();
String pFile1 = createTestDFSPath("somedfspath1");
String testPathNormed = lower(new Path(pFile1).toString());
String query =
"insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName;
runCommand(query);
String tblQlfdname = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed);
validateHDFSPaths(processReference, OUTPUTS, pFile1);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateInputTables(processReference, tblQlfdname);
//Rerun same query with same HDFS path
runCommand(query);
Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed);
validateHDFSPaths(process2Reference, OUTPUTS, pFile1);
Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
//Rerun same query with a new HDFS path. Should create a new process
String pFile2 = createTestDFSPath("somedfspath2");
query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
final String testPathNormed2 = lower(new Path(pFile2).toString());
runCommand(query);
Referenceable process3Reference = validateProcess(query, tblQlfdname, testPathNormed2);
validateHDFSPaths(process3Reference, OUTPUTS, pFile2);
Assert.assertNotEquals(process3Reference.getId()._getId(), processReference.getId()._getId());
}
@Test
public void testInsertIntoDFSDir() throws Exception {
String tableName = createTable();
String pFile1 = createTestDFSPath("somedfspath1");
......@@ -482,7 +548,7 @@ public class HiveHookIT {
runCommand(query);
String tblQlfdname = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed);
validateHDFSPaths(processReference, pFile1, OUTPUTS);
validateHDFSPaths(processReference, OUTPUTS, pFile1);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
......@@ -498,7 +564,7 @@ public class HiveHookIT {
runCommand(query);
tblQlfdname = getQualifiedTblName(tableName);
Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed);
validateHDFSPaths(process2Reference, pFile2, OUTPUTS);
validateHDFSPaths(process2Reference, OUTPUTS, pFile2);
Assert.assertNotEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
}
......@@ -564,7 +630,7 @@ public class HiveHookIT {
runCommand(query);
String tblQlfName = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfName, testPathNormed);
validateHDFSPaths(processReference, filename, OUTPUTS);
validateHDFSPaths(processReference, OUTPUTS, filename);
validateInputTables(processReference, tblQlfName);
//Import
......@@ -575,7 +641,7 @@ public class HiveHookIT {
runCommand(query);
tblQlfName = getQualifiedTblName(tableName);
processReference = validateProcess(query, testPathNormed, tblQlfName);
validateHDFSPaths(processReference, filename, INPUTS);
validateHDFSPaths(processReference, INPUTS, filename);
validateOutputTables(processReference, tblQlfName);
}
......@@ -596,7 +662,7 @@ public class HiveHookIT {
runCommand(query);
String tblQlfdName = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfdName, testPathNormed);
validateHDFSPaths(processReference, filename, OUTPUTS);
validateHDFSPaths(processReference, OUTPUTS, filename);
validateInputTables(processReference, tblQlfdName);
......@@ -608,7 +674,7 @@ public class HiveHookIT {
runCommand(query);
tblQlfdName = getQualifiedTblName(tableName);
processReference = validateProcess(query, testPathNormed, tblQlfdName);
validateHDFSPaths(processReference, filename, INPUTS);
validateHDFSPaths(processReference, INPUTS, filename);
validateOutputTables(processReference, tblQlfdName);
}
......@@ -997,22 +1063,22 @@ public class HiveHookIT {
final String testPathNormed = lower(new Path(testPath).toString());
Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName);
validateHDFSPaths(processReference, testPath, INPUTS);
validateHDFSPaths(processReference, INPUTS, testPath);
}
private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception {
private void validateHDFSPaths(Referenceable processReference, String attributeName, String... testPaths) throws Exception {
List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
final String testPathNormed = lower(new Path(testPath).toString());
String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
Assert.assertEquals(hdfsPathRef.get(NAME), new Path(testPathNormed).getName());
Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed);
for (int i = 0; i < testPaths.length; i++) {
final String testPathNormed = lower(new Path(testPaths[i]).toString());
String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
return hdfsPathRef.getId()._getId();
Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
Assert.assertEquals(hdfsPathRef.get(NAME), new Path(testPathNormed).getName());
Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed);
}
}
private String assertHDFSPathIsRegistered(String path) throws Exception {
......@@ -1393,13 +1459,12 @@ public class HiveHookIT {
}
}
private String assertProcessIsRegistered(final String queryStr, final String inputTblName, final String outputTblName) throws Exception {
private String assertProcessIsRegistered(final String queryStr, final String inputTblName, final String... outputTblNames) throws Exception {
HiveASTRewriter astRewriter = new HiveASTRewriter(conf);
String normalizedQuery = normalize(astRewriter.rewrite(queryStr));
List<Referenceable> inputs = null;
if (inputTblName != null) {
Referenceable inputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, inputTblName);
......@@ -1407,14 +1472,16 @@ public class HiveHookIT {
inputs = new ArrayList<Referenceable>();
inputs.add(inputTableRef);
}
List<Referenceable> outputs = null;
if (outputTblName != null) {
Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, outputTblName);
}});
outputs = new ArrayList<Referenceable>();
outputs.add(outputTableRef);
List<Referenceable> outputs = new ArrayList<Referenceable>();
if (outputTblNames != null) {
for(int i = 0; i < outputTblNames.length; i++) {
final String outputTblName = outputTblNames[i];
Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, outputTblName);
}});
outputs.add(outputTableRef);
}
}
String processQFName = HiveHook.getProcessQualifiedName(normalizedQuery, inputs, outputs);
LOG.debug("Searching for process with query {}", processQFName);
......
......@@ -23,6 +23,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-884 Process registration should call Entity update instead of create (sumasai)
ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth)
ATLAS-891 UI changes to implement Update term (Kalyanikashikar via yhemanth)
ATLAS-794 Business Catalog Update (jspeidel via yhemanth)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment