Commit 5b4cf440 by Hemanth Yamijala

ATLAS-904 Hive hook fails due to session state not being set (sumasai via yhemanth)

parent 32a5b761
...@@ -26,7 +26,6 @@ import org.apache.atlas.AtlasConstants; ...@@ -26,7 +26,6 @@ import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hive.rewrite.HiveASTRewriter;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
...@@ -57,13 +56,17 @@ import org.slf4j.LoggerFactory; ...@@ -57,13 +56,17 @@ import org.slf4j.LoggerFactory;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -75,7 +78,6 @@ import java.util.concurrent.TimeUnit; ...@@ -75,7 +78,6 @@ import java.util.concurrent.TimeUnit;
public class HiveHook extends AtlasHook implements ExecuteWithHookContext { public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class); private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class);
public static final String CONF_PREFIX = "atlas.hook.hive."; public static final String CONF_PREFIX = "atlas.hook.hive.";
private static final String MIN_THREADS = CONF_PREFIX + "minThreads"; private static final String MIN_THREADS = CONF_PREFIX + "minThreads";
private static final String MAX_THREADS = CONF_PREFIX + "maxThreads"; private static final String MAX_THREADS = CONF_PREFIX + "maxThreads";
...@@ -84,6 +86,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -84,6 +86,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize"; public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
private static final String SEP = ":".intern();
private static final String IO_SEP = "->".intern();
private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>(); private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
...@@ -182,12 +186,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -182,12 +186,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} }
}); });
} }
} catch(Throwable t) { } catch (Throwable t) {
LOG.error("Submitting to thread pool failed due to error ", t); LOG.error("Submitting to thread pool failed due to error ", t);
} }
} }
private void fireAndForget(HiveEventContext event) throws Exception { private void fireAndForget(HiveEventContext event) throws Exception {
assert event.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!"; assert event.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
LOG.info("Entered Atlas hook for hook type {} operation {}", event.getHookType(), event.getOperation()); LOG.info("Entered Atlas hook for hook type {} operation {}", event.getHookType(), event.getOperation());
...@@ -285,7 +290,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -285,7 +290,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private void deleteDatabase(HiveMetaStoreBridge dgiBridge, HiveEventContext event) { private void deleteDatabase(HiveMetaStoreBridge dgiBridge, HiveEventContext event) {
if (event.getOutputs().size() > 1) { if (event.getOutputs().size() > 1) {
LOG.info("Starting deletion of tables and databases with cascade {} " , event.getQueryStr()); LOG.info("Starting deletion of tables and databases with cascade {} ", event.getQueryStr());
} }
for (WriteEntity output : event.getOutputs()) { for (WriteEntity output : event.getOutputs()) {
...@@ -302,10 +307,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -302,10 +307,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} }
} }
private Pair<String, String> findChangedColNames(List<FieldSchema> oldColList, List<FieldSchema> newColList){ private Pair<String, String> findChangedColNames(List<FieldSchema> oldColList, List<FieldSchema> newColList) {
HashMap<FieldSchema, Integer> oldColHashMap = new HashMap<>(); HashMap<FieldSchema, Integer> oldColHashMap = new HashMap<>();
HashMap<FieldSchema, Integer> newColHashMap = new HashMap<>(); HashMap<FieldSchema, Integer> newColHashMap = new HashMap<>();
for (int i = 0; i < oldColList.size(); i++){ for (int i = 0; i < oldColList.size(); i++) {
oldColHashMap.put(oldColList.get(i), i); oldColHashMap.put(oldColList.get(i), i);
newColHashMap.put(newColList.get(i), i); newColHashMap.put(newColList.get(i), i);
} }
...@@ -313,15 +318,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -313,15 +318,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
String changedColStringOldName = oldColList.get(0).getName(); String changedColStringOldName = oldColList.get(0).getName();
String changedColStringNewName = changedColStringOldName; String changedColStringNewName = changedColStringOldName;
for(int i = 0; i < oldColList.size(); i++){ for (int i = 0; i < oldColList.size(); i++) {
if (!newColHashMap.containsKey(oldColList.get(i))){ if (!newColHashMap.containsKey(oldColList.get(i))) {
changedColStringOldName = oldColList.get(i).getName(); changedColStringOldName = oldColList.get(i).getName();
break; break;
} }
} }
for(int i = 0; i < newColList.size(); i++){ for (int i = 0; i < newColList.size(); i++) {
if (!oldColHashMap.containsKey(newColList.get(i))){ if (!oldColHashMap.containsKey(newColList.get(i))) {
changedColStringNewName = newColList.get(i).getName(); changedColStringNewName = newColList.get(i).getName();
break; break;
} }
...@@ -330,7 +335,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -330,7 +335,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return Pair.of(changedColStringOldName, changedColStringNewName); return Pair.of(changedColStringOldName, changedColStringNewName);
} }
private void renameColumn(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception{ private void renameColumn(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
assert event.getInputs() != null && event.getInputs().size() == 1; assert event.getInputs() != null && event.getInputs().size() == 1;
assert event.getOutputs() != null && event.getOutputs().size() > 0; assert event.getOutputs() != null && event.getOutputs().size() > 0;
...@@ -344,8 +349,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -344,8 +349,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Pair<String, String> changedColNamePair = findChangedColNames(oldColList, newColList); Pair<String, String> changedColNamePair = findChangedColNames(oldColList, newColList);
String oldColName = changedColNamePair.getLeft(); String oldColName = changedColNamePair.getLeft();
String newColName = changedColNamePair.getRight(); String newColName = changedColNamePair.getRight();
for(WriteEntity writeEntity : event.getOutputs()){ for (WriteEntity writeEntity : event.getOutputs()) {
if (writeEntity.getType() == Type.TABLE){ if (writeEntity.getType() == Type.TABLE) {
Table newTable = writeEntity.getTable(); Table newTable = writeEntity.getTable();
createOrUpdateEntities(dgiBridge, event, writeEntity, true, oldTable); createOrUpdateEntities(dgiBridge, event, writeEntity, true, oldTable);
final String newQualifiedTableName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), final String newQualifiedTableName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
...@@ -511,7 +516,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -511,7 +516,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} }
} }
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities)); event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
return result; return result;
} }
...@@ -538,7 +542,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -538,7 +542,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return null; return null;
} }
public static String lower(String str) { public static String lower(String str) {
if (StringUtils.isEmpty(str)) { if (StringUtils.isEmpty(str)) {
return null; return null;
...@@ -547,18 +550,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -547,18 +550,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} }
public static String normalize(String queryStr) { public static String normalize(String queryStr) {
String result = null; return lower(queryStr);
if (queryStr != null) {
try {
HiveASTRewriter rewriter = new HiveASTRewriter(hiveConf);
result = rewriter.rewrite(queryStr);
} catch (Exception e) {
LOG.warn("Could not rewrite query due to error. Proceeding with original query {}", queryStr, e);
}
}
result = lower(result);
return result;
} }
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception { private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
...@@ -575,8 +567,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -575,8 +567,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.info("Query id/plan is missing for {}", event.getQueryStr()); LOG.info("Query id/plan is missing for {}", event.getQueryStr());
} }
final Map<String, Referenceable> source = new LinkedHashMap<>(); final SortedMap<Entity, Referenceable> source = new TreeMap<>(entityComparator);
final Map<String, Referenceable> target = new LinkedHashMap<>(); final SortedMap<Entity, Referenceable> target = new TreeMap<>(entityComparator);
final Set<String> dataSets = new HashSet<>();
final Set<Referenceable> entities = new LinkedHashSet<>(); final Set<Referenceable> entities = new LinkedHashSet<>();
boolean isSelectQuery = isSelectQuery(event); boolean isSelectQuery = isSelectQuery(event);
...@@ -584,22 +578,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -584,22 +578,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
// filter out select queries which do not modify data // filter out select queries which do not modify data
if (!isSelectQuery) { if (!isSelectQuery) {
for (ReadEntity readEntity : event.getInputs()) { for (ReadEntity readEntity : event.getInputs()) {
processHiveEntity(dgiBridge, event, readEntity, source, entities); processHiveEntity(dgiBridge, event, readEntity, dataSets, source, entities);
} }
for (WriteEntity writeEntity : event.getOutputs()) { for (WriteEntity writeEntity : event.getOutputs()) {
processHiveEntity(dgiBridge, event, writeEntity, target, entities); processHiveEntity(dgiBridge, event, writeEntity, dataSets, target, entities);
} }
if (source.size() > 0 || target.size() > 0) { if (source.size() > 0 || target.size() > 0) {
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, source, target);
new ArrayList<Referenceable>() {{
addAll(source.values());
}},
new ArrayList<Referenceable>() {{
addAll(target.values());
}});
entities.add(processReferenceable); entities.add(processReferenceable);
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities))); event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
} else { } else {
...@@ -610,22 +597,27 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -610,22 +597,27 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} }
} }
private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets, Set<Referenceable> entities) throws Exception { private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Set<String> dataSetsProcessed,
SortedMap<Entity, Referenceable> dataSets, Set<Referenceable> entities) throws Exception {
if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) { if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) {
final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable()); final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
if (!dataSets.containsKey(tblQFName)) { if (!dataSetsProcessed.contains(tblQFName)) {
LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false); LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false);
dataSets.put(tblQFName, result.get(Type.TABLE)); dataSets.put(entity, result.get(Type.TABLE));
dataSetsProcessed.add(tblQFName);
entities.addAll(result.values()); entities.addAll(result.values());
} }
} else if (entity.getType() == Type.DFS_DIR) { } else if (entity.getType() == Type.DFS_DIR) {
final String pathUri = lower(new Path(entity.getLocation()).toString()); final String pathUri = lower(new Path(entity.getLocation()).toString());
LOG.info("Registering DFS Path {} ", pathUri); LOG.info("Registering DFS Path {} ", pathUri);
if (!dataSetsProcessed.contains(pathUri)) {
Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri); Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
dataSets.put(pathUri, hdfsPath); dataSets.put(entity, hdfsPath);
dataSetsProcessed.add(pathUri);
entities.add(hdfsPath); entities.add(hdfsPath);
} }
} }
}
private JSONObject getQueryPlan(HiveConf hiveConf, QueryPlan queryPlan) throws Exception { private JSONObject getQueryPlan(HiveConf hiveConf, QueryPlan queryPlan) throws Exception {
try { try {
...@@ -661,24 +653,30 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -661,24 +653,30 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) throws HiveException, MalformedURLException { private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) throws HiveException, MalformedURLException {
List<Referenceable> entities = new ArrayList<>(); List<Referenceable> entities = new ArrayList<>();
Table hiveTable = getEntityByType(event.getOutputs(), Type.TABLE).getTable(); final Entity hiveEntity = getEntityByType(event.getOutputs(), Type.TABLE);
Table hiveTable = hiveEntity.getTable();
//Refresh to get the correct location //Refresh to get the correct location
hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName()); hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
final String location = lower(hiveTable.getDataLocation().toString()); final String location = lower(hiveTable.getDataLocation().toString());
if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) { if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) {
LOG.info("Registering external table process {} ", event.getQueryStr()); LOG.info("Registering external table process {} ", event.getQueryStr());
List<Referenceable> inputs = new ArrayList<Referenceable>() {{ final ReadEntity dfsEntity = new ReadEntity();
add(dgiBridge.fillHDFSDataSet(location)); dfsEntity.setTyp(Type.DFS_DIR);
dfsEntity.setName(location);
SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator) {{
put(dfsEntity, dgiBridge.fillHDFSDataSet(location));
}}; }};
List<Referenceable> outputs = new ArrayList<Referenceable>() {{ SortedMap<Entity, Referenceable> outputs = new TreeMap<Entity, Referenceable>(entityComparator) {{
add(tables.get(Type.TABLE)); put(hiveEntity, tables.get(Type.TABLE));
}}; }};
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs); Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs);
String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable); String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable);
if(isCreateOp(event)){
if (isCreateOp(event)){
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
} }
entities.addAll(tables.values()); entities.addAll(tables.values());
...@@ -697,25 +695,22 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -697,25 +695,22 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return false; return false;
} }
private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) { private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent,
SortedMap<Entity, Referenceable> source, SortedMap<Entity, Referenceable> target) {
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
String queryStr = hiveEvent.getQueryStr(); String queryStr = lower(hiveEvent.getQueryStr());
if (!isCreateOp(hiveEvent)) { processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent.getOperation(), source, target));
queryStr = normalize(queryStr);
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(queryStr, sourceList, targetList));
} else {
queryStr = lower(queryStr);
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, queryStr);
}
LOG.debug("Registering query: {}", queryStr); LOG.debug("Registering query: {}", queryStr);
List<Referenceable> sourceList = new ArrayList<>(source.values());
List<Referenceable> targetList = new ArrayList<>(target.values());
//The serialization code expected a list //The serialization code expected a list
if (sourceList != null || !sourceList.isEmpty()) { if (sourceList != null && !sourceList.isEmpty()) {
processReferenceable.set("inputs", sourceList); processReferenceable.set("inputs", sourceList);
} }
if (targetList != null || !targetList.isEmpty()) { if (targetList != null && !targetList.isEmpty()) {
processReferenceable.set("outputs", targetList); processReferenceable.set("outputs", targetList);
} }
processReferenceable.set(AtlasClient.NAME, queryStr); processReferenceable.set(AtlasClient.NAME, queryStr);
...@@ -729,31 +724,64 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -729,31 +724,64 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
processReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, dgiBridge.getClusterName()); processReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, dgiBridge.getClusterName());
List<String> recentQueries = new ArrayList<>(1); List<String> recentQueries = new ArrayList<>(1);
recentQueries.add(hiveEvent.getQueryStr()); recentQueries.add(queryStr);
processReferenceable.set("recentQueries", recentQueries); processReferenceable.set("recentQueries", recentQueries);
processReferenceable.set("endTime", new Date(System.currentTimeMillis())); processReferenceable.set("endTime", new Date(System.currentTimeMillis()));
//TODO set queryGraph //TODO set queryGraph
return processReferenceable; return processReferenceable;
} }
@VisibleForTesting @VisibleForTesting
static String getProcessQualifiedName(String normalizedQuery, List<Referenceable> inputs, List<Referenceable> outputs) { static String getProcessQualifiedName(HiveOperation op, SortedMap<Entity, Referenceable> inputs, SortedMap<Entity, Referenceable> outputs) {
StringBuilder buffer = new StringBuilder(normalizedQuery); StringBuilder buffer = new StringBuilder(op.getOperationName());
addDatasets(buffer, inputs); addDatasets(op, buffer, inputs);
addDatasets(buffer, outputs); buffer.append(IO_SEP);
addDatasets(op, buffer, outputs);
LOG.info("Setting process qualified name to {}", buffer);
return buffer.toString(); return buffer.toString();
} }
private static void addDatasets(StringBuilder buffer, List<Referenceable> refs) { private static void addDatasets(HiveOperation op, StringBuilder buffer, final Map<Entity, Referenceable> refs) {
if (refs != null) { if (refs != null) {
for (Referenceable input : refs) { for (Entity input : refs.keySet()) {
//TODO - Change to qualifiedName later final Entity entity = input;
buffer.append(":");
String dataSetQlfdName = (String) input.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
if (addQueryType(op, entity)) {
buffer.append(SEP);
buffer.append(((WriteEntity) entity).getWriteType().name());
}
if (Type.DFS_DIR.equals(entity.getType()) ||
Type.LOCAL_DIR.equals(entity.getType())) {
LOG.debug("Skipping dfs dir addition into process qualified name {} ", refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
} else {
buffer.append(SEP);
String dataSetQlfdName = (String) refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
// '/' breaks query parsing on ATLAS
buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", "")); buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
} }
} }
} }
}
private static boolean addQueryType(HiveOperation op, Entity entity) {
if (WriteEntity.class.isAssignableFrom(entity.getClass())) {
if (((WriteEntity) entity).getWriteType() != null &&
op.equals(HiveOperation.QUERY)) {
switch (((WriteEntity) entity).getWriteType()) {
case INSERT:
case INSERT_OVERWRITE:
case UPDATE:
case DELETE:
case PATH_WRITE:
return true;
default:
}
}
}
return false;
}
public static class HiveEventContext { public static class HiveEventContext {
private Set<ReadEntity> inputs; private Set<ReadEntity> inputs;
...@@ -768,9 +796,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -768,9 +796,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private String queryStr; private String queryStr;
private Long queryStartTime; private Long queryStartTime;
private String queryType; private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
List<HookNotification.HookNotificationMessage> messages = new ArrayList<>(); private String queryType;
public void setInputs(Set<ReadEntity> inputs) { public void setInputs(Set<ReadEntity> inputs) {
this.inputs = inputs; this.inputs = inputs;
...@@ -868,4 +896,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -868,4 +896,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return messages; return messages;
} }
} }
@VisibleForTesting
static final class EntityComparator implements Comparator<Entity> {
@Override
public int compare(Entity o1, Entity o2) {
return o1.getName().toLowerCase().compareTo(o2.getName().toLowerCase());
}
}
@VisibleForTesting
static final Comparator<Entity> entityComparator = new EntityComparator();
} }
...@@ -44,7 +44,10 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; ...@@ -44,7 +44,10 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
...@@ -60,10 +63,15 @@ import java.text.ParseException; ...@@ -60,10 +63,15 @@ import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.apache.atlas.AtlasClient.NAME; import static org.apache.atlas.AtlasClient.NAME;
import static org.apache.atlas.hive.hook.HiveHook.entityComparator;
import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName;
import static org.apache.atlas.hive.hook.HiveHook.lower; import static org.apache.atlas.hive.hook.HiveHook.lower;
import static org.apache.atlas.hive.hook.HiveHook.normalize; import static org.apache.atlas.hive.hook.HiveHook.normalize;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
...@@ -95,6 +103,7 @@ public class HiveHookIT { ...@@ -95,6 +103,7 @@ public class HiveHookIT {
driver = new Driver(conf); driver = new Driver(conf);
ss = new SessionState(conf); ss = new SessionState(conf);
ss = SessionState.start(ss); ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss); SessionState.setCurrentSessionState(ss);
Configuration configuration = ApplicationProperties.get(); Configuration configuration = ApplicationProperties.get();
...@@ -256,19 +265,50 @@ public class HiveHookIT { ...@@ -256,19 +265,50 @@ public class HiveHookIT {
validateHDFSPaths(processReference, INPUTS, pFile); validateHDFSPaths(processReference, INPUTS, pFile);
} }
private void validateOutputTables(Referenceable processReference, String... expectedTableNames) throws Exception { private List<Entity> getInputs(String inputName, Entity.Type entityType) {
validateTables(processReference, OUTPUTS, expectedTableNames); final ReadEntity entity = new ReadEntity();
if ( Entity.Type.DFS_DIR.equals(entityType)) {
entity.setName(lower(new Path(inputName).toString()));
entity.setTyp(Entity.Type.DFS_DIR);
} else {
entity.setName(getQualifiedTblName(inputName));
entity.setTyp(Entity.Type.TABLE);
}
return new ArrayList<Entity>() {{ add(entity); }};
}
private List<Entity> getOutputs(String inputName, Entity.Type entityType) {
final WriteEntity entity = new WriteEntity();
if ( Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) {
entity.setName(lower(new Path(inputName).toString()));
entity.setTyp(entityType);
} else {
entity.setName(getQualifiedTblName(inputName));
entity.setTyp(Entity.Type.TABLE);
}
return new ArrayList<Entity>() {{ add(entity); }};
}
private void validateOutputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception {
validateTables(processReference, OUTPUTS, expectedTables);
} }
private void validateInputTables(Referenceable processReference, String... expectedTableNames) throws Exception { private void validateInputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception {
validateTables(processReference, INPUTS, expectedTableNames); validateTables(processReference, INPUTS, expectedTables);
} }
private void validateTables(Referenceable processReference, String attrName, String... expectedTableNames) throws Exception { private void validateTables(Referenceable processReference, String attrName, List<Entity> expectedTables) throws Exception {
List<Id> tableRef = (List<Id>) processReference.get(attrName); List<Id> tableRef = (List<Id>) processReference.get(attrName);
for(int i = 0; i < expectedTableNames.length; i++) { for(int i = 0; i < expectedTables.size(); i++) {
Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId()); Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId());
Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), expectedTableNames[i]); LOG.debug("Validating output {} {} ", i, entity);
Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), expectedTables.get(i).getName());
} }
} }
...@@ -301,7 +341,15 @@ public class HiveHookIT { ...@@ -301,7 +341,15 @@ public class HiveHookIT {
String query = "create table " + ctasTableName + " as select * from " + tableName; String query = "create table " + ctasTableName + " as select * from " + tableName;
runCommand(query); runCommand(query);
assertProcessIsRegistered(query); final ReadEntity entity = new ReadEntity();
entity.setName(getQualifiedTblName(tableName));
entity.setTyp(Entity.Type.TABLE);
final WriteEntity writeEntity = new WriteEntity();
writeEntity.setTyp(Entity.Type.TABLE);
writeEntity.setName(getQualifiedTblName(ctasTableName));
assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, new ArrayList<Entity>() {{ add(entity); }}, new ArrayList<Entity>() {{ add(writeEntity); }});
assertTableIsRegistered(DEFAULT_DB, ctasTableName); assertTableIsRegistered(DEFAULT_DB, ctasTableName);
} }
...@@ -313,7 +361,11 @@ public class HiveHookIT { ...@@ -313,7 +361,11 @@ public class HiveHookIT {
runCommand(query); runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName); assertTableIsRegistered(DEFAULT_DB, ctasTableName);
String processId = assertProcessIsRegistered(query);
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
String processId = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
final String drpquery = String.format("drop table %s ", ctasTableName); final String drpquery = String.format("drop table %s ", ctasTableName);
runCommand(drpquery); runCommand(drpquery);
...@@ -322,16 +374,15 @@ public class HiveHookIT { ...@@ -322,16 +374,15 @@ public class HiveHookIT {
//Fix after ATLAS-876 //Fix after ATLAS-876
runCommand(query); runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName); assertTableIsRegistered(DEFAULT_DB, ctasTableName);
String process2Id = assertProcessIsRegistered(query); String process2Id = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
Assert.assertEquals(process2Id, processId); Assert.assertEquals(process2Id, processId);
Referenceable processRef = atlasClient.getEntity(processId); Referenceable processRef = atlasClient.getEntity(processId);
String tblQlfdname = getQualifiedTblName(tableName);
String ctasQlfdname = getQualifiedTblName(ctasTableName);
validateInputTables(processRef, tblQlfdname); validateInputTables(processRef, inputs);
validateOutputTables(processRef, ctasQlfdname, ctasQlfdname); outputs.add(outputs.get(0));
validateOutputTables(processRef, outputs);
} }
@Test @Test
...@@ -341,7 +392,7 @@ public class HiveHookIT { ...@@ -341,7 +392,7 @@ public class HiveHookIT {
String query = "create view " + viewName + " as select * from " + tableName; String query = "create view " + viewName + " as select * from " + tableName;
runCommand(query); runCommand(query);
assertProcessIsRegistered(query); assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
assertTableIsRegistered(DEFAULT_DB, viewName); assertTableIsRegistered(DEFAULT_DB, viewName);
} }
...@@ -355,7 +406,7 @@ public class HiveHookIT { ...@@ -355,7 +406,7 @@ public class HiveHookIT {
runCommand(query); runCommand(query);
String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name); String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name);
assertProcessIsRegistered(query); assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
String viewId = assertTableIsRegistered(DEFAULT_DB, viewName); String viewId = assertTableIsRegistered(DEFAULT_DB, viewName);
//Check lineage which includes table1 //Check lineage which includes table1
...@@ -371,7 +422,7 @@ public class HiveHookIT { ...@@ -371,7 +422,7 @@ public class HiveHookIT {
runCommand(query); runCommand(query);
//Check if alter view process is reqistered //Check if alter view process is reqistered
assertProcessIsRegistered(query); assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name); String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name);
Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId); Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId);
...@@ -408,7 +459,9 @@ public class HiveHookIT { ...@@ -408,7 +459,9 @@ public class HiveHookIT {
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName; String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
runCommand(query); runCommand(query);
assertProcessIsRegistered(query, null, getQualifiedTblName(tableName)); List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
assertProcessIsRegistered(query, HiveOperation.LOAD, null, outputs);
} }
@Test @Test
...@@ -419,7 +472,7 @@ public class HiveHookIT { ...@@ -419,7 +472,7 @@ public class HiveHookIT {
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')"; String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
runCommand(query); runCommand(query);
validateProcess(query, null, getQualifiedTblName(tableName)); validateProcess(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE));
} }
@Test @Test
...@@ -429,16 +482,15 @@ public class HiveHookIT { ...@@ -429,16 +482,15 @@ public class HiveHookIT {
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String loadFile = createTestDFSFile("loadDFSFile"); String loadFile = createTestDFSFile("loadDFSFile");
final String testPathNormed = lower(new Path(loadFile).toString());
String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')"; String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
runCommand(query); runCommand(query);
final String tblQlfdName = getQualifiedTblName(tableName); final List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName); Referenceable processReference = validateProcess(query, HiveOperation.LOAD, getInputs(loadFile, Entity.Type.DFS_DIR), outputs);
validateHDFSPaths(processReference, INPUTS, loadFile); validateHDFSPaths(processReference, INPUTS, loadFile);
validateOutputTables(processReference, tblQlfdName); validateOutputTables(processReference, outputs);
} }
private String getQualifiedTblName(String inputTable) { private String getQualifiedTblName(String inputTable) {
...@@ -450,20 +502,20 @@ public class HiveHookIT { ...@@ -450,20 +502,20 @@ public class HiveHookIT {
return inputtblQlfdName; return inputtblQlfdName;
} }
private Referenceable validateProcess(String query, String inputTable, String... outputTables) throws Exception { private Referenceable validateProcess(String query, HiveOperation op, List<Entity> inputTables, List<Entity> outputTables) throws Exception {
String processId = assertProcessIsRegistered(query, inputTable, outputTables); String processId = assertProcessIsRegistered(query, op, inputTables, outputTables);
Referenceable process = atlasClient.getEntity(processId); Referenceable process = atlasClient.getEntity(processId);
if (inputTable == null) { if (inputTables == null) {
Assert.assertNull(process.get(INPUTS)); Assert.assertNull(process.get(INPUTS));
} else { } else {
Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), 1); Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputTables.size());
validateInputTables(process, inputTable); validateInputTables(process, inputTables);
} }
if (outputTables == null) { if (outputTables == null) {
Assert.assertNull(process.get(OUTPUTS)); Assert.assertNull(process.get(OUTPUTS));
} else { } else {
Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1); Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputTables.size());
validateOutputTables(process, outputTables); validateOutputTables(process, outputTables);
} }
...@@ -482,12 +534,16 @@ public class HiveHookIT { ...@@ -482,12 +534,16 @@ public class HiveHookIT {
String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName); String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName); String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
Referenceable processRef1 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName)); List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
Referenceable processRef1 = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
//Rerun same query. Should result in same process //Rerun same query. Should result in same process
runCommand(query); runCommand(query);
Referenceable processRef2 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName)); Referenceable processRef2 = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId()); Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId());
} }
...@@ -500,7 +556,7 @@ public class HiveHookIT { ...@@ -500,7 +556,7 @@ public class HiveHookIT {
"insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName; "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
runCommand(query); runCommand(query);
validateProcess(query, getQualifiedTblName(tableName), null); validateProcess(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null);
assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, tableName);
} }
...@@ -509,69 +565,80 @@ public class HiveHookIT { ...@@ -509,69 +565,80 @@ public class HiveHookIT {
public void testUpdateProcess() throws Exception { public void testUpdateProcess() throws Exception {
String tableName = createTable(); String tableName = createTable();
String pFile1 = createTestDFSPath("somedfspath1"); String pFile1 = createTestDFSPath("somedfspath1");
String testPathNormed = lower(new Path(pFile1).toString());
String query = String query =
"insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName; "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName;
runCommand(query); runCommand(query);
String tblQlfdname = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed); List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
Referenceable processReference = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
validateHDFSPaths(processReference, OUTPUTS, pFile1); validateHDFSPaths(processReference, OUTPUTS, pFile1);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateInputTables(processReference, inputs);
validateInputTables(processReference, tblQlfdname);
//Rerun same query with same HDFS path //Rerun same query with same HDFS path
runCommand(query); runCommand(query);
Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed); Referenceable process2Reference = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
validateHDFSPaths(process2Reference, OUTPUTS, pFile1); validateHDFSPaths(process2Reference, OUTPUTS, pFile1);
Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId()); Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
//Rerun same query with a new HDFS path. Should create a new process //Rerun same query with a new HDFS path. Will result in same process since HDFS paths are not part of qualifiedName.
String pFile2 = createTestDFSPath("somedfspath2"); final String pFile2 = createTestDFSPath("somedfspath2");
query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName; query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
final String testPathNormed2 = lower(new Path(pFile2).toString());
runCommand(query); runCommand(query);
List<Entity> p3Outputs = new ArrayList<Entity>() {{
addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
addAll(outputs);
}};
Referenceable process3Reference = validateProcess(query, tblQlfdname, testPathNormed2); Referenceable process3Reference = validateProcess(query, HiveOperation.QUERY, inputs, p3Outputs);
validateHDFSPaths(process3Reference, OUTPUTS, pFile2); validateHDFSPaths(process3Reference, OUTPUTS, pFile2);
Assert.assertNotEquals(process3Reference.getId()._getId(), processReference.getId()._getId()); Assert.assertEquals(process3Reference.getId()._getId(), processReference.getId()._getId());
} }
@Test @Test
public void testInsertIntoDFSDir() throws Exception { public void testInsertIntoDFSDir() throws Exception {
String tableName = createTable(); String tableName = createTable();
String pFile1 = createTestDFSPath("somedfspath1"); String pFile1 = createTestDFSPath("somedfspath1");
String testPathNormed = lower(new Path(pFile1).toString());
String query = String query =
"insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName; "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName;
runCommand(query); runCommand(query);
String tblQlfdname = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed); List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
Referenceable processReference = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
validateHDFSPaths(processReference, OUTPUTS, pFile1); validateHDFSPaths(processReference, OUTPUTS, pFile1);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateInputTables(processReference, tblQlfdname); validateInputTables(processReference, inputs);
//Rerun same query with different HDFS path //Rerun same query with different HDFS path
String pFile2 = createTestDFSPath("somedfspath2"); final String pFile2 = createTestDFSPath("somedfspath2");
testPathNormed = lower(new Path(pFile2).toString());
query = query =
"insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName; "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
runCommand(query); runCommand(query);
tblQlfdname = getQualifiedTblName(tableName); List<Entity> p2Outputs = new ArrayList<Entity>() {{
Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed); addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
addAll(outputs);
}};
Referenceable process2Reference = validateProcess(query, HiveOperation.QUERY, inputs, p2Outputs);
validateHDFSPaths(process2Reference, OUTPUTS, pFile2); validateHDFSPaths(process2Reference, OUTPUTS, pFile2);
Assert.assertNotEquals(process2Reference.getId()._getId(), processReference.getId()._getId()); Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
} }
@Test @Test
...@@ -585,7 +652,13 @@ public class HiveHookIT { ...@@ -585,7 +652,13 @@ public class HiveHookIT {
"insert into " + insertTableName + " select id, name from " + tableName; "insert into " + insertTableName + " select id, name from " + tableName;
runCommand(query); runCommand(query);
validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
outputs.get(0).setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
validateProcess(query, HiveOperation.QUERY, inputs, outputs);
assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true); assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
...@@ -599,7 +672,12 @@ public class HiveHookIT { ...@@ -599,7 +672,12 @@ public class HiveHookIT {
"insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
+ " where dt = '2015-01-01'"; + " where dt = '2015-01-01'";
runCommand(query); runCommand(query);
validateProcess(query, getQualifiedTblName(tableName) , getQualifiedTblName(insertTableName));
List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
validateProcess(query, HiveOperation.QUERY, inputs, outputs);
assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName); assertTableIsRegistered(DEFAULT_DB, insertTableName);
...@@ -627,28 +705,31 @@ public class HiveHookIT { ...@@ -627,28 +705,31 @@ public class HiveHookIT {
public void testExportImportUnPartitionedTable() throws Exception { public void testExportImportUnPartitionedTable() throws Exception {
String tableName = createTable(false); String tableName = createTable(false);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, tableName);
String filename = "pfile://" + mkdir("export"); String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to \"" + filename + "\""; String query = "export table " + tableName + " to \"" + filename + "\"";
final String testPathNormed = lower(new Path(filename).toString());
runCommand(query); runCommand(query);
String tblQlfName = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfName, testPathNormed); List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs);
validateHDFSPaths(processReference, OUTPUTS, filename); validateHDFSPaths(processReference, OUTPUTS, filename);
validateInputTables(processReference, tblQlfName); validateInputTables(processReference, inputs);
//Import //Import
tableName = createTable(false); tableName = createTable(false);
tableId = assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, tableName);
query = "import table " + tableName + " from '" + filename + "'"; query = "import table " + tableName + " from '" + filename + "'";
runCommand(query); runCommand(query);
tblQlfName = getQualifiedTblName(tableName); outputs = getOutputs(tableName, Entity.Type.TABLE);
processReference = validateProcess(query, testPathNormed, tblQlfName); processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs);
validateHDFSPaths(processReference, INPUTS, filename); validateHDFSPaths(processReference, INPUTS, filename);
validateOutputTables(processReference, tblQlfName); validateOutputTables(processReference, outputs);
} }
@Test @Test
...@@ -662,14 +743,16 @@ public class HiveHookIT { ...@@ -662,14 +743,16 @@ public class HiveHookIT {
runCommand(query); runCommand(query);
String filename = "pfile://" + mkdir("export"); String filename = "pfile://" + mkdir("export");
final String testPathNormed = lower(new Path(filename).toString());
query = "export table " + tableName + " to \"" + filename + "\""; query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query); runCommand(query);
String tblQlfdName = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfdName, testPathNormed); List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs);
validateHDFSPaths(processReference, OUTPUTS, filename); validateHDFSPaths(processReference, OUTPUTS, filename);
validateInputTables(processReference, tblQlfdName); validateInputTables(processReference, inputs);
//Import //Import
tableName = createTable(true); tableName = createTable(true);
...@@ -677,11 +760,12 @@ public class HiveHookIT { ...@@ -677,11 +760,12 @@ public class HiveHookIT {
query = "import table " + tableName + " from '" + filename + "'"; query = "import table " + tableName + " from '" + filename + "'";
runCommand(query); runCommand(query);
tblQlfdName = getQualifiedTblName(tableName);
processReference = validateProcess(query, testPathNormed, tblQlfdName); outputs = getOutputs(tableName, Entity.Type.TABLE);
processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs);
validateHDFSPaths(processReference, INPUTS, filename); validateHDFSPaths(processReference, INPUTS, filename);
validateOutputTables(processReference, tblQlfdName); validateOutputTables(processReference, outputs);
} }
@Test @Test
...@@ -689,12 +773,13 @@ public class HiveHookIT { ...@@ -689,12 +773,13 @@ public class HiveHookIT {
String tableName = createTable(); String tableName = createTable();
String query = "select * from " + tableName; String query = "select * from " + tableName;
runCommand(query); runCommand(query);
assertProcessIsNotRegistered(query); List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
//check with uppercase table name //check with uppercase table name
query = "SELECT * from " + tableName.toUpperCase(); query = "SELECT * from " + tableName.toUpperCase();
runCommand(query); runCommand(query);
assertProcessIsNotRegistered(query); assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
} }
@Test @Test
...@@ -963,9 +1048,10 @@ public class HiveHookIT { ...@@ -963,9 +1048,10 @@ public class HiveHookIT {
String query = String.format("truncate table %s", tableName); String query = String.format("truncate table %s", tableName);
runCommand(query); runCommand(query);
List<Entity> outputs = getInputs(tableName, Entity.Type.TABLE);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateProcess(query, null, getQualifiedTblName(tableName)); validateProcess(query, HiveOperation.TRUNCATETABLE, null, outputs);
//Check lineage //Check lineage
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
...@@ -1072,10 +1158,10 @@ public class HiveHookIT { ...@@ -1072,10 +1158,10 @@ public class HiveHookIT {
} }
}); });
final String tblQlfdName = getQualifiedTblName(tableName); List<Entity> inputs = getInputs(testPath, Entity.Type.DFS_DIR);
List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
final String testPathNormed = lower(new Path(testPath).toString()); Referenceable processReference = validateProcess(query, HiveOperation.ALTERTABLE_LOCATION, inputs, outputs);
Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName);
validateHDFSPaths(processReference, INPUTS, testPath); validateHDFSPaths(processReference, INPUTS, testPath);
} }
...@@ -1281,7 +1367,6 @@ public class HiveHookIT { ...@@ -1281,7 +1367,6 @@ public class HiveHookIT {
//Should have no effect //Should have no effect
assertDBIsNotRegistered(dbName); assertDBIsNotRegistered(dbName);
assertProcessIsNotRegistered(query);
} }
@Test @Test
...@@ -1294,7 +1379,6 @@ public class HiveHookIT { ...@@ -1294,7 +1379,6 @@ public class HiveHookIT {
//Should have no effect //Should have no effect
assertTableIsNotRegistered(DEFAULT_DB, tableName); assertTableIsNotRegistered(DEFAULT_DB, tableName);
assertProcessIsNotRegistered(query);
} }
@Test @Test
...@@ -1472,56 +1556,39 @@ public class HiveHookIT { ...@@ -1472,56 +1556,39 @@ public class HiveHookIT {
} }
} }
private String assertProcessIsRegistered(final String queryStr, final String inputTblName, final String... outputTblNames) throws Exception { private String assertProcessIsRegistered(final String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception {
String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
HiveASTRewriter astRewriter = new HiveASTRewriter(conf);
String normalizedQuery = normalize(astRewriter.rewrite(queryStr));
List<Referenceable> inputs = null;
if (inputTblName != null) {
Referenceable inputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, inputTblName);
}});
inputs = new ArrayList<Referenceable>();
inputs.add(inputTableRef);
}
List<Referenceable> outputs = new ArrayList<Referenceable>();
if (outputTblNames != null) {
for(int i = 0; i < outputTblNames.length; i++) {
final String outputTblName = outputTblNames[i];
Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, outputTblName);
}});
outputs.add(outputTableRef);
}
}
String processQFName = HiveHook.getProcessQualifiedName(normalizedQuery, inputs, outputs);
LOG.debug("Searching for process with query {}", processQFName); LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() { return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override @Override
public void assertOnEntity(final Referenceable entity) throws Exception { public void assertOnEntity(final Referenceable entity) throws Exception {
List<String> recentQueries = (List<String>) entity.get("recentQueries"); List<String> recentQueries = (List<String>) entity.get("recentQueries");
Assert.assertEquals(recentQueries.get(0), queryStr); Assert.assertEquals(recentQueries.get(0), lower(queryStr));
} }
}); });
} }
private String assertProcessIsRegistered(final String queryStr) throws Exception { private String getDSTypeName(Entity entity) {
String lowerQryStr = lower(queryStr); return Entity.Type.TABLE.equals(entity.getType()) ? HiveDataTypes.HIVE_TABLE.name() : FSDataTypes.HDFS_PATH().toString();
LOG.debug("Searching for process with query {}", lowerQryStr);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, lowerQryStr, new AssertPredicate() {
@Override
public void assertOnEntity(final Referenceable entity) throws Exception {
List<String> recentQueries = (List<String>) entity.get("recentQueries");
Assert.assertEquals(recentQueries.get(0), queryStr);
} }
});
private SortedMap<Entity, Referenceable> getSortedProcessDataSets(List<Entity> inputTbls) {
SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator);
if (inputTbls != null) {
for (final Entity tbl : inputTbls) {
Referenceable inputTableRef = new Referenceable(getDSTypeName(tbl), new HashMap<String, Object>() {{
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tbl.getName());
}});
inputs.put(tbl, inputTableRef);
}
}
return inputs;
} }
private void assertProcessIsNotRegistered(String queryStr) throws Exception { private void assertProcessIsNotRegistered(String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception {
LOG.debug("Searching for process with query {}", queryStr); String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, normalize(queryStr)); LOG.debug("Searching for process with query {}", processQFName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
} }
private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception { private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception {
......
...@@ -39,6 +39,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -39,6 +39,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-904 Hive hook fails due to session state not being set (sumasai via yhemanth)
ATLAS-929 Add test for trait preservation on column rename for non-default database (svimal2106 via shwethags) ATLAS-929 Add test for trait preservation on column rename for non-default database (svimal2106 via shwethags)
ATLAS-922 remove test atlas-application.properties embedded in atlas-typesystem.jar (madhan.neethiraj via yhemanth) ATLAS-922 remove test atlas-application.properties embedded in atlas-typesystem.jar (madhan.neethiraj via yhemanth)
ATLAS-725 UI : Filter out or highlight deleted entities in search result outputs(dsl, text) , schema view, and lineage graph (kevalbhatt18 via sumasai) ATLAS-725 UI : Filter out or highlight deleted entities in search result outputs(dsl, text) , schema view, and lineage graph (kevalbhatt18 via sumasai)
......
...@@ -400,7 +400,7 @@ public class EntityResource { ...@@ -400,7 +400,7 @@ public class EntityResource {
JSONObject response = getResponse(entityResult); JSONObject response = getResponse(entityResult);
return Response.ok(response).build(); return Response.ok(response).build();
} catch (EntityNotFoundException e) { } catch (EntityNotFoundException e) {
if(guids != null || !guids.isEmpty()) { if(guids != null && !guids.isEmpty()) {
LOG.error("An entity with GUID={} does not exist ", guids, e); LOG.error("An entity with GUID={} does not exist ", guids, e);
} else { } else {
LOG.error("An entity with qualifiedName {}-{}-{} does not exist", entityType, attribute, value, e); LOG.error("An entity with qualifiedName {}-{}-{} does not exist", entityType, attribute, value, e);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment