Commit f51c8861 by Suma Shivaprasad

ATLAS-917 Add hdfs paths to process qualified name for non-partition based queries(sumasai)

parent f623bddf
......@@ -426,8 +426,8 @@ public class HiveMetaStoreBridge {
createDate = new Date(hiveTable.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR);
LOG.debug("Setting create time to {} ", createDate);
tableReference.set(HiveDataModelGenerator.CREATE_TIME, createDate);
} catch(NumberFormatException ne) {
LOG.error("Error while updating createTime for the table {} ", hiveTable.getCompleteName(), ne);
} catch(Exception ne) {
LOG.error("Error while setting createTime for the table {} ", hiveTable.getCompleteName(), ne);
}
}
......
......@@ -21,6 +21,7 @@ package org.apache.atlas.hive.hook;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import kafka.security.auth.Write;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
......@@ -66,7 +67,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -86,8 +89,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
private static final String SEP = ":".intern();
private static final String IO_SEP = "->".intern();
static final String SEP = ":".intern();
static final String IO_SEP = "->".intern();
private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
......@@ -291,6 +294,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private void deleteDatabase(HiveMetaStoreBridge dgiBridge, HiveEventContext event) {
if (event.getOutputs().size() > 1) {
LOG.info("Starting deletion of tables and databases with cascade {} ", event.getQueryStr());
} else {
LOG.info("Starting deletion of database {} ", event.getQueryStr());
}
for (WriteEntity output : event.getOutputs()) {
......@@ -549,10 +554,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return str.toLowerCase().trim();
}
public static String normalize(String queryStr) {
return lower(queryStr);
}
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
Set<ReadEntity> inputs = event.getInputs();
Set<WriteEntity> outputs = event.getOutputs();
......@@ -567,8 +568,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.info("Query id/plan is missing for {}", event.getQueryStr());
}
final SortedMap<Entity, Referenceable> source = new TreeMap<>(entityComparator);
final SortedMap<Entity, Referenceable> target = new TreeMap<>(entityComparator);
final SortedMap<ReadEntity, Referenceable> source = new TreeMap<>(entityComparator);
final SortedMap<WriteEntity, Referenceable> target = new TreeMap<>(entityComparator);
final Set<String> dataSets = new HashSet<>();
final Set<Referenceable> entities = new LinkedHashSet<>();
......@@ -577,16 +578,27 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
// filter out select queries which do not modify data
if (!isSelectQuery) {
for (ReadEntity readEntity : event.getInputs()) {
SortedSet<ReadEntity> sortedHiveInputs = new TreeSet<>(entityComparator);;
if ( event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
SortedSet<WriteEntity> sortedHiveOutputs = new TreeSet<>(entityComparator);
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
for (ReadEntity readEntity : sortedHiveInputs) {
processHiveEntity(dgiBridge, event, readEntity, dataSets, source, entities);
}
for (WriteEntity writeEntity : event.getOutputs()) {
for (WriteEntity writeEntity : sortedHiveOutputs) {
processHiveEntity(dgiBridge, event, writeEntity, dataSets, target, entities);
}
if (source.size() > 0 || target.size() > 0) {
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, source, target);
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, sortedHiveInputs, sortedHiveOutputs, source, target);
entities.add(processReferenceable);
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
} else {
......@@ -597,8 +609,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
}
private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Set<String> dataSetsProcessed,
SortedMap<Entity, Referenceable> dataSets, Set<Referenceable> entities) throws Exception {
private <T extends Entity> void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, T entity, Set<String> dataSetsProcessed,
SortedMap<T, Referenceable> dataSets, Set<Referenceable> entities) throws Exception {
if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) {
final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
if (!dataSetsProcessed.contains(tblQFName)) {
......@@ -609,7 +621,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
} else if (entity.getType() == Type.DFS_DIR) {
final String pathUri = lower(new Path(entity.getLocation()).toString());
LOG.info("Registering DFS Path {} ", pathUri);
LOG.debug("Registering DFS Path {} ", pathUri);
if (!dataSetsProcessed.contains(pathUri)) {
Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
dataSets.put(entity, hdfsPath);
......@@ -653,7 +665,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) throws HiveException, MalformedURLException {
List<Referenceable> entities = new ArrayList<>();
final Entity hiveEntity = getEntityByType(event.getOutputs(), Type.TABLE);
final WriteEntity hiveEntity = (WriteEntity) getEntityByType(event.getOutputs(), Type.TABLE);
Table hiveTable = hiveEntity.getTable();
//Refresh to get the correct location
hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
......@@ -665,18 +677,25 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
dfsEntity.setTyp(Type.DFS_DIR);
dfsEntity.setName(location);
SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator) {{
SortedMap<ReadEntity, Referenceable> hiveInputsMap = new TreeMap<ReadEntity, Referenceable>(entityComparator) {{
put(dfsEntity, dgiBridge.fillHDFSDataSet(location));
}};
SortedMap<Entity, Referenceable> outputs = new TreeMap<Entity, Referenceable>(entityComparator) {{
SortedMap<WriteEntity, Referenceable> hiveOutputsMap = new TreeMap<WriteEntity, Referenceable>(entityComparator) {{
put(hiveEntity, tables.get(Type.TABLE));
}};
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs);
SortedSet<ReadEntity> sortedIps = new TreeSet<>(entityComparator);
sortedIps.addAll(hiveInputsMap.keySet());
SortedSet<WriteEntity> sortedOps = new TreeSet<>(entityComparator);
sortedOps.addAll(hiveOutputsMap.keySet());
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event,
sortedIps, sortedOps, hiveInputsMap, hiveOutputsMap);
String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable);
if (isCreateOp(event)){
LOG.info("Overriding process qualified name to {}", tableQualifiedName);
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
}
entities.addAll(tables.values());
......@@ -689,6 +708,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
if (HiveOperation.CREATETABLE.equals(hiveEvent.getOperation())
|| HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation())
|| HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation())
|| HiveOperation.ALTERTABLE_LOCATION.equals(hiveEvent.getOperation())
|| HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation())) {
return true;
}
......@@ -696,11 +716,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent,
SortedMap<Entity, Referenceable> source, SortedMap<Entity, Referenceable> target) {
final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> source, SortedMap<WriteEntity, Referenceable> target) {
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
String queryStr = lower(hiveEvent.getQueryStr());
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent.getOperation(), source, target));
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent, sortedHiveInputs, sortedHiveOutputs, source, target));
LOG.debug("Registering query: {}", queryStr);
List<Referenceable> sourceList = new ArrayList<>(source.values());
......@@ -733,51 +753,113 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
@VisibleForTesting
static String getProcessQualifiedName(HiveOperation op, SortedMap<Entity, Referenceable> inputs, SortedMap<Entity, Referenceable> outputs) {
static String getProcessQualifiedName(HiveEventContext eventContext, final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> hiveInputsMap, SortedMap<WriteEntity, Referenceable> hiveOutputsMap) {
HiveOperation op = eventContext.getOperation();
StringBuilder buffer = new StringBuilder(op.getOperationName());
addDatasets(op, buffer, inputs);
boolean ignoreHDFSPathsinQFName = ignoreHDFSPathsinQFName(op, sortedHiveInputs, sortedHiveOutputs);
if ( ignoreHDFSPathsinQFName && LOG.isDebugEnabled()) {
LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, eventContext.getQueryStr());
}
addInputs(op, sortedHiveInputs, buffer, hiveInputsMap, ignoreHDFSPathsinQFName);
buffer.append(IO_SEP);
addDatasets(op, buffer, outputs);
addOutputs(op, sortedHiveOutputs, buffer, hiveOutputsMap, ignoreHDFSPathsinQFName);
LOG.info("Setting process qualified name to {}", buffer);
return buffer.toString();
}
private static void addDatasets(HiveOperation op, StringBuilder buffer, final Map<Entity, Referenceable> refs) {
private static boolean ignoreHDFSPathsinQFName(final HiveOperation op, final Set<ReadEntity> inputs, final Set<WriteEntity> outputs) {
switch (op) {
case LOAD:
case IMPORT:
return isPartitionBasedQuery(outputs);
case EXPORT:
return isPartitionBasedQuery(inputs);
case QUERY:
return true;
}
return false;
}
private static boolean isPartitionBasedQuery(Set<? extends Entity> entities) {
for (Entity entity : entities) {
if (Type.PARTITION.equals(entity.getType())) {
return true;
}
}
return false;
}
private static void addInputs(HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, final Map<ReadEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) {
if (refs != null) {
for (Entity input : refs.keySet()) {
final Entity entity = input;
if (sortedInputs != null) {
Set<String> dataSetsProcessed = new LinkedHashSet<>();
for (Entity input : sortedInputs) {
if (!dataSetsProcessed.contains(input.getName().toLowerCase())) {
//HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
if (addQueryType(op, entity)) {
buffer.append(SEP);
buffer.append(((WriteEntity) entity).getWriteType().name());
if (ignoreHDFSPathsInQFName &&
(Type.DFS_DIR.equals(input.getType()) || Type.LOCAL_DIR.equals(input.getType()))) {
LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName());
} else if (refs.containsKey(input)) {
addDataset(buffer, refs.get(input));
}
if (Type.DFS_DIR.equals(entity.getType()) ||
Type.LOCAL_DIR.equals(entity.getType())) {
LOG.debug("Skipping dfs dir addition into process qualified name {} ", refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
} else {
dataSetsProcessed.add(input.getName().toLowerCase());
}
}
}
}
}
private static void addDataset(StringBuilder buffer, Referenceable ref) {
buffer.append(SEP);
String dataSetQlfdName = (String) refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
String dataSetQlfdName = (String) ref.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
// '/' breaks query parsing on ATLAS
buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
}
private static void addOutputs(HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, final Map<WriteEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) {
if (refs != null) {
Set<String> dataSetsProcessed = new LinkedHashSet<>();
if (sortedOutputs != null) {
for (Entity output : sortedOutputs) {
final Entity entity = output;
if (!dataSetsProcessed.contains(output.getName().toLowerCase())) {
//HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
if (addQueryType(op, (WriteEntity) entity)) {
buffer.append(SEP);
buffer.append(((WriteEntity) entity).getWriteType().name());
}
if (ignoreHDFSPathsInQFName &&
(Type.DFS_DIR.equals(output.getType()) || Type.LOCAL_DIR.equals(output.getType()))) {
LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName());
} else if (refs.containsKey(output)) {
addDataset(buffer, refs.get(output));
}
dataSetsProcessed.add(output.getName().toLowerCase());
}
}
}
}
}
private static boolean addQueryType(HiveOperation op, Entity entity) {
if (WriteEntity.class.isAssignableFrom(entity.getClass())) {
if (((WriteEntity) entity).getWriteType() != null &&
op.equals(HiveOperation.QUERY)) {
private static boolean addQueryType(HiveOperation op, WriteEntity entity) {
if (((WriteEntity) entity).getWriteType() != null && HiveOperation.QUERY.equals(op)) {
switch (((WriteEntity) entity).getWriteType()) {
case INSERT:
case INSERT_OVERWRITE:
case UPDATE:
case DELETE:
return true;
case PATH_WRITE:
//Add query type only for DFS paths and ignore local paths since they are not added as outputs
if ( !Type.LOCAL_DIR.equals(entity.getType())) {
return true;
default:
}
break;
default:
}
}
return false;
......
......@@ -62,15 +62,22 @@ import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import static org.apache.atlas.AtlasClient.NAME;
import static org.apache.atlas.hive.hook.HiveHook.entityComparator;
import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName;
import static org.apache.atlas.hive.hook.HiveHook.lower;
import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
import static org.apache.atlas.hive.hook.HiveHook.SEP;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
......@@ -82,6 +89,8 @@ public class HiveHookIT {
private static final String DGI_URL = "http://localhost:21000/";
private static final String CLUSTER_NAME = "test";
public static final String DEFAULT_DB = "default";
private static final String PART_FILE = "2015-01-01";
private Driver driver;
private AtlasClient atlasClient;
private HiveMetaStoreBridge hiveMetaStoreBridge;
......@@ -262,7 +271,7 @@ public class HiveHookIT {
validateHDFSPaths(processReference, INPUTS, pFile);
}
private List<Entity> getInputs(String inputName, Entity.Type entityType) {
private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) {
final ReadEntity entity = new ReadEntity();
if ( Entity.Type.DFS_DIR.equals(entityType)) {
......@@ -270,14 +279,13 @@ public class HiveHookIT {
entity.setTyp(Entity.Type.DFS_DIR);
} else {
entity.setName(getQualifiedTblName(inputName));
entity.setTyp(Entity.Type.TABLE);
entity.setTyp(entityType);
}
return new ArrayList<Entity>() {{ add(entity); }};
return new LinkedHashSet<ReadEntity>() {{ add(entity); }};
}
private List<Entity> getOutputs(String inputName, Entity.Type entityType) {
private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) {
final WriteEntity entity = new WriteEntity();
if ( Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) {
......@@ -285,27 +293,32 @@ public class HiveHookIT {
entity.setTyp(entityType);
} else {
entity.setName(getQualifiedTblName(inputName));
entity.setTyp(Entity.Type.TABLE);
entity.setTyp(entityType);
}
return new ArrayList<Entity>() {{ add(entity); }};
return new LinkedHashSet<WriteEntity>() {{ add(entity); }};
}
private void validateOutputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception {
private void validateOutputTables(Referenceable processReference, Set<WriteEntity> expectedTables) throws Exception {
validateTables(processReference, OUTPUTS, expectedTables);
}
private void validateInputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception {
private void validateInputTables(Referenceable processReference, Set<ReadEntity> expectedTables) throws Exception {
validateTables(processReference, INPUTS, expectedTables);
}
private void validateTables(Referenceable processReference, String attrName, List<Entity> expectedTables) throws Exception {
private void validateTables(Referenceable processReference, String attrName, Set<? extends Entity> expectedTables) throws Exception {
List<Id> tableRef = (List<Id>) processReference.get(attrName);
Iterator<? extends Entity> iterator = expectedTables.iterator();
for(int i = 0; i < expectedTables.size(); i++) {
Entity hiveEntity = iterator.next();
if (Entity.Type.TABLE.equals(hiveEntity.getType()) ||
Entity.Type.DFS_DIR.equals(hiveEntity.getType())) {
Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId());
LOG.debug("Validating output {} {} ", i, entity);
Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), expectedTables.get(i).getName());
Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), hiveEntity.getName());
}
}
}
......@@ -338,18 +351,22 @@ public class HiveHookIT {
String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query);
final ReadEntity entity = new ReadEntity();
entity.setName(getQualifiedTblName(tableName));
entity.setTyp(Entity.Type.TABLE);
final WriteEntity writeEntity = new WriteEntity();
writeEntity.setTyp(Entity.Type.TABLE);
writeEntity.setName(getQualifiedTblName(ctasTableName));
final Set<ReadEntity> readEntities = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> writeEntities = getOutputs(ctasTableName, Entity.Type.TABLE);
assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, new ArrayList<Entity>() {{ add(entity); }}, new ArrayList<Entity>() {{ add(writeEntity); }});
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, readEntities, writeEntities));
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
}
private HiveHook.HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
HiveHook.HiveEventContext event = new HiveHook.HiveEventContext();
event.setQueryStr(query);
event.setOperation(op);
event.setInputs(inputs);
event.setOutputs(outputs);
return event;
}
@Test
public void testDropAndRecreateCTASOutput() throws Exception {
String tableName = createTable();
......@@ -359,10 +376,11 @@ public class HiveHookIT {
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
String processId = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
String processId = assertProcessIsRegistered(hiveEventContext);
final String drpquery = String.format("drop table %s ", ctasTableName);
runCommand(drpquery);
......@@ -371,14 +389,13 @@ public class HiveHookIT {
//Fix after ATLAS-876
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
String process2Id = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
String process2Id = assertProcessIsRegistered(hiveEventContext, inputs, outputs);
Assert.assertEquals(process2Id, processId);
Referenceable processRef = atlasClient.getEntity(processId);
validateInputTables(processRef, inputs);
outputs.add(outputs.get(0));
outputs.add(outputs.iterator().next());
validateOutputTables(processRef, outputs);
}
......@@ -389,7 +406,7 @@ public class HiveHookIT {
String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query);
assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
assertTableIsRegistered(DEFAULT_DB, viewName);
}
......@@ -403,7 +420,7 @@ public class HiveHookIT {
runCommand(query);
String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name);
assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
String viewId = assertTableIsRegistered(DEFAULT_DB, viewName);
//Check lineage which includes table1
......@@ -419,7 +436,7 @@ public class HiveHookIT {
runCommand(query);
//Check if alter view process is reqistered
assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name);
Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId);
......@@ -456,9 +473,7 @@ public class HiveHookIT {
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
runCommand(query);
List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
assertProcessIsRegistered(query, HiveOperation.LOAD, null, outputs);
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
}
@Test
......@@ -466,41 +481,56 @@ public class HiveHookIT {
String tableName = createTable(true);
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
runCommand(query);
validateProcess(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE));
assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
}
@Test
public void testLoadDFSPath() throws Exception {
public void testLoadDFSPathPartitioned() throws Exception {
String tableName = createTable(true, true, false);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, tableName);
String loadFile = createTestDFSFile("loadDFSFile");
String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
final String loadFile = createTestDFSFile("loadDFSFile");
String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
runCommand(query);
final List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
Referenceable processReference = validateProcess(query, HiveOperation.LOAD, getInputs(loadFile, Entity.Type.DFS_DIR), outputs);
final Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE);
final Set<ReadEntity> inputs = getInputs(loadFile, Entity.Type.DFS_DIR);
validateHDFSPaths(processReference, INPUTS, loadFile);
final Set<WriteEntity> partitionOps = new LinkedHashSet<>(outputs);
partitionOps.addAll(getOutputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION));
Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.LOAD, inputs, partitionOps), inputs, outputs);
validateHDFSPaths(processReference, INPUTS, loadFile);
validateOutputTables(processReference, outputs);
final String loadFile2 = createTestDFSFile("loadDFSFile1");
query = "load data inpath '" + loadFile2 + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
runCommand(query);
Set<ReadEntity> process2Inputs = getInputs(loadFile2, Entity.Type.DFS_DIR);
Set<ReadEntity> expectedInputs = new LinkedHashSet<>();
expectedInputs.addAll(process2Inputs);
expectedInputs.addAll(inputs);
validateProcess(constructEvent(query, HiveOperation.LOAD, expectedInputs, partitionOps), expectedInputs, outputs);
}
private String getQualifiedTblName(String inputTable) {
String inputtblQlfdName = inputTable;
if (inputTable != null && !inputTable.contains(".")) {
if (inputTable != null && !inputTable.contains("@")) {
inputtblQlfdName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable);
}
return inputtblQlfdName;
}
private Referenceable validateProcess(String query, HiveOperation op, List<Entity> inputTables, List<Entity> outputTables) throws Exception {
String processId = assertProcessIsRegistered(query, op, inputTables, outputTables);
private Referenceable validateProcess(HiveHook.HiveEventContext event, Set<ReadEntity> inputTables, Set<WriteEntity> outputTables) throws Exception {
String processId = assertProcessIsRegistered(event, inputTables, outputTables);
Referenceable process = atlasClient.getEntity(processId);
if (inputTables == null) {
Assert.assertNull(process.get(INPUTS));
......@@ -519,25 +549,47 @@ public class HiveHookIT {
return process;
}
private Referenceable validateProcess(HiveHook.HiveEventContext event) throws Exception {
return validateProcess(event, event.getInputs(), event.getOutputs());
}
@Test
public void testInsertIntoTable() throws Exception {
String tableName = createTable();
String inputTable1Name = createTable();
String inputTable2Name = createTable();
String insertTableName = createTable();
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, inputTable1Name);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
String query = "insert into " + insertTableName + " select id, name from " + tableName;
String query = "insert into " + insertTableName + " select t1.id, t1.name from " + inputTable2Name + " as t2, " + inputTable1Name + " as t1 where t1.id=t2.id";
runCommand(query);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
final Set<ReadEntity> inputs = getInputs(inputTable1Name, Entity.Type.TABLE);
inputs.addAll(getInputs(inputTable2Name, Entity.Type.TABLE));
Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
(outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs);
Set<ReadEntity> expectedInputs = new TreeSet<ReadEntity>(entityComparator) {{
addAll(inputs);
}};
assertTableIsRegistered(DEFAULT_DB, insertTableName);
Referenceable processRef1 = validateProcess(event, expectedInputs, outputs);
//Test sorting of tbl names
SortedSet<String> sortedTblNames = new TreeSet<>();
sortedTblNames.add(getQualifiedTblName(inputTable1Name));
sortedTblNames.add(getQualifiedTblName(inputTable2Name));
Referenceable processRef1 = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
//Verify sorted orer of inputs in qualified name
Assert.assertEquals(Joiner.on(SEP).join("QUERY", sortedTblNames.first(), sortedTblNames.last()) + IO_SEP + SEP + Joiner.on(SEP).join(WriteEntity.WriteType.INSERT.name(), getQualifiedTblName(insertTableName))
, processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
//Rerun same query. Should result in same process
runCommandWithDelay(query, 1000);
Referenceable processRef2 = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
Referenceable processRef2 = validateProcess(event, expectedInputs, outputs);
Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId());
}
......@@ -550,7 +602,7 @@ public class HiveHookIT {
"insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
runCommand(query);
validateProcess(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null);
validateProcess(constructEvent(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null));
assertTableIsRegistered(DEFAULT_DB, tableName);
}
......@@ -564,72 +616,78 @@ public class HiveHookIT {
runCommand(query);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE);
Referenceable processReference = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, outputs);
Referenceable processReference = validateProcess(hiveEventContext);
validateHDFSPaths(processReference, OUTPUTS, pFile1);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateInputTables(processReference, inputs);
//Rerun same query with same HDFS path
runCommand(query);
Referenceable process2Reference = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
runCommandWithDelay(query, 1000);
assertTableIsRegistered(DEFAULT_DB, tableName);
Referenceable process2Reference = validateProcess(hiveEventContext);
validateHDFSPaths(process2Reference, OUTPUTS, pFile1);
Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
//Rerun same query with a new HDFS path. Will result in same process since HDFS paths are not part of qualifiedName.
//Rerun same query with a new HDFS path. Will result in same process since HDFS paths is not part of qualified name for QUERY operations
final String pFile2 = createTestDFSPath("somedfspath2");
query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
runCommand(query);
List<Entity> p3Outputs = new ArrayList<Entity>() {{
runCommandWithDelay(query, 1000);
assertTableIsRegistered(DEFAULT_DB, tableName);
Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
addAll(outputs);
}};
Referenceable process3Reference = validateProcess(query, HiveOperation.QUERY, inputs, p3Outputs);
Referenceable process3Reference = validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, p3Outputs));
validateHDFSPaths(process3Reference, OUTPUTS, pFile2);
Assert.assertEquals(process3Reference.getId()._getId(), processReference.getId()._getId());
}
@Test
public void testInsertIntoDFSDir() throws Exception {
String tableName = createTable();
public void testInsertIntoDFSDirPartitioned() throws Exception {
//Test with partitioned table
String tableName = createTable(true);
String pFile1 = createTestDFSPath("somedfspath1");
String query =
"insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName;
"insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'";
runCommand(query);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE);
Referenceable processReference = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
validateHDFSPaths(processReference, OUTPUTS, pFile1);
final Set<ReadEntity> partitionIps = new LinkedHashSet<>(inputs);
partitionIps.addAll(getInputs(DEFAULT_DB + "@" + tableName + "@dt='" + PART_FILE + "'", Entity.Type.PARTITION));
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateInputTables(processReference, inputs);
Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, outputs), inputs, outputs);
//Rerun same query with different HDFS path
//Rerun same query with different HDFS path. Should not create another process and should update it.
final String pFile2 = createTestDFSPath("somedfspath2");
query =
"insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
"insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'";
runCommand(query);
List<Entity> p2Outputs = new ArrayList<Entity>() {{
addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
final Set<WriteEntity> pFile2Outputs = getOutputs(pFile2, Entity.Type.DFS_DIR);
((WriteEntity)pFile2Outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE);
//Now the process has 2 paths - one older with deleted reference to partition and another with the the latest partition
Set<WriteEntity> p2Outputs = new LinkedHashSet<WriteEntity>() {{
addAll(pFile2Outputs);
addAll(outputs);
}};
Referenceable process2Reference = validateProcess(query, HiveOperation.QUERY, inputs, p2Outputs);
Referenceable process2Reference = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, pFile2Outputs), inputs, p2Outputs);
validateHDFSPaths(process2Reference, OUTPUTS, pFile2);
Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
......@@ -647,12 +705,12 @@ public class HiveHookIT {
runCommand(query);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
outputs.get(0).setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
outputs.iterator().next().setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
validateProcess(query, HiveOperation.QUERY, inputs, outputs);
validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, outputs));
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
......@@ -660,21 +718,40 @@ public class HiveHookIT {
@Test
public void testInsertIntoPartition() throws Exception {
String tableName = createTable(true);
String insertTableName = createTable(true);
final boolean isPartitionedTable = true;
String tableName = createTable(isPartitionedTable);
String insertTableName = createTable(isPartitionedTable);
String query =
"insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
+ " where dt = '2015-01-01'";
"insert into " + insertTableName + " partition(dt = '"+ PART_FILE + "') select id, name from " + tableName
+ " where dt = '"+ PART_FILE + "'";
runCommand(query);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
final Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
final Set<ReadEntity> partitionIps = new LinkedHashSet<ReadEntity>() {
{
addAll(inputs);
add(getPartitionInput());
validateProcess(query, HiveOperation.QUERY, inputs, outputs);
}
};
final Set<WriteEntity> partitionOps = new LinkedHashSet<WriteEntity>() {
{
addAll(outputs);
add(getPartitionOutput());
}
};
validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, partitionOps), inputs, outputs);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
//TODO - update
}
private String random() {
......@@ -701,65 +778,111 @@ public class HiveHookIT {
assertTableIsRegistered(DEFAULT_DB, tableName);
String filename = "pfile://" + mkdir("export");
String filename = "pfile://" + mkdir("exportUnPartitioned");
String query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs);
Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs));
validateHDFSPaths(processReference, OUTPUTS, filename);
validateInputTables(processReference, inputs);
//Import
tableName = createTable(false);
assertTableIsRegistered(DEFAULT_DB, tableName);
String importTableName = createTable(false);
assertTableIsRegistered(DEFAULT_DB, importTableName);
query = "import table " + tableName + " from '" + filename + "'";
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
outputs = getOutputs(tableName, Entity.Type.TABLE);
processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs);
validateHDFSPaths(processReference, INPUTS, filename);
outputs = getOutputs(importTableName, Entity.Type.TABLE);
validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs));
validateOutputTables(processReference, outputs);
//Should create another process
filename = "pfile://" + mkdir("export2UnPartitioned");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
inputs = getInputs(tableName, Entity.Type.TABLE);
outputs = getOutputs(filename, Entity.Type.DFS_DIR);
validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs));
//import again shouyld create another process
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
outputs = getOutputs(importTableName, Entity.Type.TABLE);
validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs));
}
@Test
public void testExportImportPartitionedTable() throws Exception {
String tableName = createTable(true);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
boolean isPartitionedTable = true;
final String tableName = createTable(isPartitionedTable);
assertTableIsRegistered(DEFAULT_DB, tableName);
//Add a partition
String partFile = "pfile://" + mkdir("partition");
String query = "alter table " + tableName + " add partition (dt='2015-01-01') location '" + partFile + "'";
String query = "alter table " + tableName + " add partition (dt='"+ PART_FILE + "') location '" + partFile + "'";
runCommand(query);
String filename = "pfile://" + mkdir("export");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
final Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE);
final Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs);
validateHDFSPaths(processReference, OUTPUTS, filename);
//Note that export has only partition as input in this case
final Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
partitionIps.addAll(expectedExportInputs);
validateInputTables(processReference, inputs);
Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs), expectedExportInputs, outputs);
validateHDFSPaths(processReference, OUTPUTS, filename);
//Import
tableName = createTable(true);
tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String importTableName = createTable(true);
assertTableIsRegistered(DEFAULT_DB, tableName);
query = "import table " + tableName + " from '" + filename + "'";
query = "import table " + importTableName + " from '" + filename + "'";
runCommand(query);
outputs = getOutputs(tableName, Entity.Type.TABLE);
processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs);
validateHDFSPaths(processReference, INPUTS, filename);
final Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR);
final Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE);
validateOutputTables(processReference, outputs);
final Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
partitionOps.addAll(importOutputs);
validateProcess(constructEvent(query, HiveOperation.IMPORT, expectedImportInputs , partitionOps), expectedImportInputs, importOutputs);
//Export should update same process
filename = "pfile://" + mkdir("export2");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
final Set<WriteEntity> outputs2 = getOutputs(filename, Entity.Type.DFS_DIR);
Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
addAll(outputs2);
addAll(outputs);
}};
validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs2), expectedExportInputs, p3Outputs);
query = "alter table " + importTableName + " drop partition (dt='"+ PART_FILE + "')";
runCommand(query);
//Import should update same process
query = "import table " + importTableName + " from '" + filename + "'";
runCommandWithDelay(query, 1000);
final Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR);
final Set<ReadEntity> expectedImport2Inputs = new LinkedHashSet<ReadEntity>() {{
addAll(importInputs);
addAll(expectedImportInputs);
}};
validateProcess(constructEvent(query, HiveOperation.IMPORT, importInputs, partitionOps), expectedImport2Inputs, importOutputs);
}
@Test
......@@ -767,13 +890,14 @@ public class HiveHookIT {
String tableName = createTable();
String query = "select * from " + tableName;
runCommand(query);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, null);
assertProcessIsNotRegistered(hiveEventContext);
//check with uppercase table name
query = "SELECT * from " + tableName.toUpperCase();
runCommand(query);
assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
assertProcessIsNotRegistered(hiveEventContext);
}
@Test
......@@ -1042,10 +1166,10 @@ public class HiveHookIT {
String query = String.format("truncate table %s", tableName);
runCommand(query);
List<Entity> outputs = getInputs(tableName, Entity.Type.TABLE);
Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateProcess(query, HiveOperation.TRUNCATETABLE, null, outputs);
validateProcess(constructEvent(query, HiveOperation.TRUNCATETABLE, null, outputs));
//Check lineage
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
......@@ -1144,7 +1268,7 @@ public class HiveHookIT {
String query = "alter table " + tableName + " set location '" + testPath + "'";
runCommand(query);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable tableRef) throws Exception {
Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
......@@ -1152,10 +1276,11 @@ public class HiveHookIT {
}
});
List<Entity> inputs = getInputs(testPath, Entity.Type.DFS_DIR);
List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName, false), null);
Referenceable processReference = atlasClient.getEntity(processId);
Referenceable processReference = validateProcess(query, HiveOperation.ALTERTABLE_LOCATION, inputs, outputs);
validateHDFSPaths(processReference, INPUTS, testPath);
}
......@@ -1302,6 +1427,20 @@ public class HiveHookIT {
assertTableIsNotRegistered(DEFAULT_DB, tableName);
}
private WriteEntity getPartitionOutput() {
WriteEntity partEntity = new WriteEntity();
partEntity.setName(PART_FILE);
partEntity.setTyp(Entity.Type.PARTITION);
return partEntity;
}
private ReadEntity getPartitionInput() {
ReadEntity partEntity = new ReadEntity();
partEntity.setName(PART_FILE);
partEntity.setTyp(Entity.Type.PARTITION);
return partEntity;
}
@Test
public void testDropDatabaseWithCascade() throws Exception {
//Test Deletion of database and its corresponding tables
......@@ -1550,26 +1689,66 @@ public class HiveHookIT {
}
}
private String assertProcessIsRegistered(final String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception {
String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws Exception {
try {
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if ( event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
public void assertOnEntity(final Referenceable entity) throws Exception {
List<String> recentQueries = (List<String>) entity.get("recentQueries");
Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr()));
}
});
} catch (Exception e) {
LOG.error("Exception : ", e);
throw e;
}
}
private String assertProcessIsRegistered(final HiveHook.HiveEventContext event, final Set<ReadEntity> inputTbls, final Set<WriteEntity> outputTbls) throws Exception {
try {
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if ( event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
public void assertOnEntity(final Referenceable entity) throws Exception {
List<String> recentQueries = (List<String>) entity.get("recentQueries");
Assert.assertEquals(recentQueries.get(0), lower(queryStr));
Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr()));
}
});
} catch(Exception e) {
LOG.error("Exception : ", e);
throw e;
}
}
private String getDSTypeName(Entity entity) {
return Entity.Type.TABLE.equals(entity.getType()) ? HiveDataTypes.HIVE_TABLE.name() : FSDataTypes.HDFS_PATH().toString();
}
private SortedMap<Entity, Referenceable> getSortedProcessDataSets(List<Entity> inputTbls) {
SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator);
private <T extends Entity> SortedMap<T, Referenceable> getSortedProcessDataSets(Set<T> inputTbls) {
SortedMap<T, Referenceable> inputs = new TreeMap<T, Referenceable>(entityComparator);
if (inputTbls != null) {
for (final Entity tbl : inputTbls) {
for (final T tbl : inputTbls) {
Referenceable inputTableRef = new Referenceable(getDSTypeName(tbl), new HashMap<String, Object>() {{
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tbl.getName());
}});
......@@ -1579,10 +1758,22 @@ public class HiveHookIT {
return inputs;
}
private void assertProcessIsNotRegistered(String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception {
String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
private void assertProcessIsNotRegistered(HiveHook.HiveEventContext event) throws Exception {
try {
SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
if ( event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs());
}
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
LOG.debug("Searching for process with query {}", processQFName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
} catch( Exception e) {
LOG.error("Exception : ", e);
}
}
private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception {
......
......@@ -128,7 +128,7 @@ public abstract class AtlasHook {
} catch (Exception e) {
numRetries++;
if (numRetries < maxRetries) {
LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
LOG.info("Failed to notify atlas for entity {}. Retrying", message, e);
} else {
if (shouldLogFailedMessages && e instanceof NotificationException) {
List<String> failedMessages = ((NotificationException) e).getFailedMessages();
......
......@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES:
ALL CHANGES:
ATLAS-966 Exit execution of import_hive.sh if HIVE_HOME is not set (svimal2106 via sumasai)
ATLAS-917 Add hdfs paths to process qualified name for non-partition based queries (sumasai)
--Release 0.7-incubating
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment