Commit 60b44a93 by lina.li Committed by Sarath Subramanian

ATLAS-3185: Create process execution for Impala integration

parent 87075f8a
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.atlas.impala; package org.apache.atlas.impala;
import java.lang.Runnable;
import org.apache.atlas.impala.hook.ImpalaLineageHook; import org.apache.atlas.impala.hook.ImpalaLineageHook;
import java.io.*; import java.io.*;
......
...@@ -24,7 +24,7 @@ import java.util.Map; ...@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.atlas.impala.model.ImpalaOperationType; import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaQuery; import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
/** /**
* Contain the info related to an linear record from Impala * Contain the info related to an linear record from Impala
...@@ -70,6 +70,10 @@ public class AtlasImpalaHookContext { ...@@ -70,6 +70,10 @@ public class AtlasImpalaHookContext {
return hook.getClusterName(); return hook.getClusterName();
} }
public String getHostName() {
return hook.getHostName();
}
public boolean isConvertHdfsPathToLowerCase() { public boolean isConvertHdfsPathToLowerCase() {
return hook.isConvertHdfsPathToLowerCase(); return hook.isConvertHdfsPathToLowerCase();
} }
......
...@@ -19,11 +19,8 @@ ...@@ -19,11 +19,8 @@
package org.apache.atlas.impala.hook; package org.apache.atlas.impala.hook;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
......
...@@ -19,7 +19,8 @@ ...@@ -19,7 +19,8 @@
package org.apache.atlas.impala.hook; package org.apache.atlas.impala.hook;
import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME; import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME;
import java.net.InetAddress;
import java.net.UnknownHostException;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.io.IOException; import java.io.IOException;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
...@@ -44,15 +45,24 @@ public class ImpalaLineageHook extends AtlasHook { ...@@ -44,15 +45,24 @@ public class ImpalaLineageHook extends AtlasHook {
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name"; public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String CONF_REALM_NAME = "atlas.realm.name"; public static final String CONF_REALM_NAME = "atlas.realm.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
public static final String DEFAULT_HOST_NAME = "localhost";
private static final String clusterName; private static final String clusterName;
private static final String realm; private static final String realm;
private static final boolean convertHdfsPathToLowerCase; private static final boolean convertHdfsPathToLowerCase;
private static String hostName;
static { static {
clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
realm = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what should default be ?? realm = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what should default be ??
convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false); convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.warn("No hostname found. Setting the hostname to default value {}", DEFAULT_HOST_NAME, e);
hostName = DEFAULT_HOST_NAME;
}
} }
public ImpalaLineageHook() { public ImpalaLineageHook() {
...@@ -122,6 +132,10 @@ public class ImpalaLineageHook extends AtlasHook { ...@@ -122,6 +132,10 @@ public class ImpalaLineageHook extends AtlasHook {
} }
} }
public String getHostName() {
return hostName;
}
private UserGroupInformation getUgiFromUserName(String userName) throws IOException { private UserGroupInformation getUgiFromUserName(String userName) throws IOException {
String userPrincipal = userName.contains(REALM_SEPARATOR)? userName : userName + "@" + getRealm(); String userPrincipal = userName.contains(REALM_SEPARATOR)? userName : userName + "@" + getRealm();
Subject userSubject = new Subject(false, Sets.newHashSet( Subject userSubject = new Subject(false, Sets.newHashSet(
......
...@@ -29,6 +29,7 @@ import java.util.HashSet; ...@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext; import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaOperationParser; import org.apache.atlas.impala.hook.ImpalaOperationParser;
import org.apache.atlas.impala.model.ImpalaDataType; import org.apache.atlas.impala.model.ImpalaDataType;
...@@ -43,6 +44,8 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo; ...@@ -43,6 +44,8 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -75,12 +78,17 @@ public abstract class BaseImpalaEvent { ...@@ -75,12 +78,17 @@ public abstract class BaseImpalaEvent {
public static final String ATTRIBUTE_START_TIME = "startTime"; public static final String ATTRIBUTE_START_TIME = "startTime";
public static final String ATTRIBUTE_USER_NAME = "userName"; public static final String ATTRIBUTE_USER_NAME = "userName";
public static final String ATTRIBUTE_QUERY_TEXT = "queryText"; public static final String ATTRIBUTE_QUERY_TEXT = "queryText";
public static final String ATTRIBUTE_PROCESS = "process";
public static final String ATTRIBUTE_PROCESS_EXECUTIONS = "processExecutions";
public static final String ATTRIBUTE_QUERY_ID = "queryId"; public static final String ATTRIBUTE_QUERY_ID = "queryId";
public static final String ATTRIBUTE_QUERY_PLAN = "queryPlan"; public static final String ATTRIBUTE_QUERY_PLAN = "queryPlan";
public static final String ATTRIBUTE_END_TIME = "endTime"; public static final String ATTRIBUTE_END_TIME = "endTime";
public static final String ATTRIBUTE_RECENT_QUERIES = "recentQueries"; public static final String ATTRIBUTE_RECENT_QUERIES = "recentQueries";
public static final String ATTRIBUTE_QUERY = "query"; public static final String ATTRIBUTE_QUERY = "query";
public static final String ATTRIBUTE_DEPENDENCY_TYPE = "dependencyType"; public static final String ATTRIBUTE_DEPENDENCY_TYPE = "dependencyType";
public static final String ATTRIBUTE_HOSTNAME = "hostName";
public static final String EMPTY_ATTRIBUTE_VALUE = "";
public static final long MILLIS_CONVERT_FACTOR = 1000; public static final long MILLIS_CONVERT_FACTOR = 1000;
...@@ -525,15 +533,43 @@ public abstract class BaseImpalaEvent { ...@@ -525,15 +533,43 @@ public abstract class BaseImpalaEvent {
ret.setAttribute(ATTRIBUTE_NAME, queryStr); ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, context.getImpalaOperationType()); ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, context.getImpalaOperationType());
// the unit of timestamp from lineage record is in seconds. Convert to milliseconds to Atlas // We are setting an empty value to these attributes, since now we have a new entity type called impala process
ret.setAttribute(ATTRIBUTE_START_TIME, context.getLineageQuery().getTimestamp() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR); // execution which captures these values. We have to set empty values here because these attributes are
ret.setAttribute(ATTRIBUTE_END_TIME, context.getLineageQuery().getEndTime() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR); // mandatory attributes for impala process entity type.
ret.setAttribute(ATTRIBUTE_START_TIME, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_END_TIME, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_USER_NAME, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_QUERY_ID, EMPTY_ATTRIBUTE_VALUE);
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr));
return ret;
}
protected AtlasEntity getImpalaProcessExecutionEntity(AtlasEntity impalaProcess) throws Exception {
AtlasEntity ret = new AtlasEntity(ImpalaDataType.IMPALA_PROCESS_EXECUTION.getName());
String queryStr = context.getQueryStr();
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
}
Long startTime = context.getLineageQuery().getTimestamp() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
Long endTime = context.getLineageQuery().getEndTime() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString() +
QNAME_SEP_PROCESS + startTime.toString() +
QNAME_SEP_PROCESS + endTime.toString());
ret.setAttribute(ATTRIBUTE_NAME, queryStr + QNAME_SEP_PROCESS + startTime);
ret.setAttribute(ATTRIBUTE_START_TIME, startTime);
ret.setAttribute(ATTRIBUTE_END_TIME, endTime);
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName()); ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr); ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
ret.setAttribute(ATTRIBUTE_QUERY_ID, context.getLineageQuery().getQueryId()); ret.setAttribute(ATTRIBUTE_QUERY_ID, context.getLineageQuery().getQueryId());
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported"); ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr)); ret.setAttribute(ATTRIBUTE_HOSTNAME, context.getHostName());
ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, AtlasTypeUtil.toAtlasRelatedObjectId(impalaProcess));
return ret; return ret;
} }
......
...@@ -110,15 +110,28 @@ public class CreateImpalaProcess extends BaseImpalaEvent { ...@@ -110,15 +110,28 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
if (!inputs.isEmpty() || !outputs.isEmpty()) { if (!inputs.isEmpty() || !outputs.isEmpty()) {
AtlasEntity process = getImpalaProcessEntity(inputs, outputs); AtlasEntity process = getImpalaProcessEntity(inputs, outputs);
if (process!= null && LOG.isDebugEnabled()) { if (process!= null) {
LOG.debug("get process entity with qualifiedName: {}", process.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); if (LOG.isDebugEnabled()) {
} LOG.debug("get process entity with qualifiedName: {}",
process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
ret.addEntity(process); ret.addEntity(process);
processColumnLineage(process, ret); AtlasEntity processExecution = getImpalaProcessExecutionEntity(process);
if (processExecution != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("get process executition entity with qualifiedName: {}",
processExecution.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
ret.addEntity(processExecution);
}
addProcessedEntities(ret); processColumnLineage(process, ret);
addProcessedEntities(ret);
}
} else { } else {
ret = null; ret = null;
} }
...@@ -154,8 +167,10 @@ public class CreateImpalaProcess extends BaseImpalaEvent { ...@@ -154,8 +167,10 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
String outputColName = getQualifiedName(columnVertex); String outputColName = getQualifiedName(columnVertex);
AtlasEntity outputColumn = context.getEntity(outputColName); AtlasEntity outputColumn = context.getEntity(outputColName);
LOG.debug("processColumnLineage(): target id = {}, target column name = {}", if (LOG.isDebugEnabled()) {
LOG.debug("processColumnLineage(): target id = {}, target column name = {}",
targetId, outputColName); targetId, outputColName);
}
if (outputColumn == null) { if (outputColumn == null) {
LOG.warn("column-lineage: non-existing output-column {}", outputColName); LOG.warn("column-lineage: non-existing output-column {}", outputColName);
...@@ -219,7 +234,9 @@ public class CreateImpalaProcess extends BaseImpalaEvent { ...@@ -219,7 +234,9 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
for (AtlasEntity columnLineage : columnLineages) { for (AtlasEntity columnLineage : columnLineages) {
String columnQualifiedName = (String)columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME); String columnQualifiedName = (String)columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
LOG.debug("get column lineage entity with qualifiedName: {}", columnQualifiedName); if (LOG.isDebugEnabled()) {
LOG.debug("get column lineage entity with qualifiedName: {}", columnQualifiedName);
}
entities.addEntity(columnLineage); entities.addEntity(columnLineage);
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.atlas.impala; package org.apache.atlas.impala;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME; import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUERY_TEXT;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_RECENT_QUERIES; import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_RECENT_QUERIES;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.HIVE_TYPE_DB; import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.HIVE_TYPE_DB;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
...@@ -26,6 +27,7 @@ import static org.testng.Assert.assertNotNull; ...@@ -26,6 +27,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -33,8 +35,10 @@ import org.apache.atlas.ApplicationProperties; ...@@ -33,8 +35,10 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext; import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaLineageHook; import org.apache.atlas.impala.hook.ImpalaLineageHook;
import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
import org.apache.atlas.impala.model.ImpalaDataType; import org.apache.atlas.impala.model.ImpalaDataType;
import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
...@@ -99,6 +103,7 @@ public class ImpalaLineageITBase { ...@@ -99,6 +103,7 @@ public class ImpalaLineageITBase {
} }
// return guid of the entity
protected String assertEntityIsRegistered(final String typeName, final String property, final String value, protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
final AssertPredicate assertPredicate) throws Exception { final AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new Predicate() { waitFor(80000, new Predicate() {
...@@ -141,6 +146,25 @@ public class ImpalaLineageITBase { ...@@ -141,6 +146,25 @@ public class ImpalaLineageITBase {
}); });
} }
protected String assertEntityIsRegisteredViaGuid(String guid,
final AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new Predicate() {
@Override
public void evaluate() throws Exception {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByGuid(guid);
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
assertNotNull(entity);
if (assertPredicate != null) {
assertPredicate.assertOnEntity(entity);
}
}
});
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByGuid(guid);
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
return (String) entity.getGuid();
}
protected String assertProcessIsRegistered(List<String> processQFNames, String queryString) throws Exception { protected String assertProcessIsRegistered(List<String> processQFNames, String queryString) throws Exception {
try { try {
...@@ -185,6 +209,82 @@ public class ImpalaLineageITBase { ...@@ -185,6 +209,82 @@ public class ImpalaLineageITBase {
} }
} }
private String assertProcessExecutionIsRegistered(AtlasEntity impalaProcess, final String queryString) throws Exception {
try {
String guid = "";
List<AtlasObjectId> processExecutions = toAtlasObjectIdList(impalaProcess.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS));
for (AtlasObjectId processExecution : processExecutions) {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.
getEntityByGuid(processExecution.getGuid());
AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
if (String.valueOf(entity.getAttribute(ATTRIBUTE_QUERY_TEXT)).equals(queryString.toLowerCase().trim())) {
guid = entity.getGuid();
break;
}
}
return assertEntityIsRegisteredViaGuid(guid, new AssertPredicate() {
@Override
public void assertOnEntity(final AtlasEntity entity) throws Exception {
String queryText = (String) entity.getAttribute(ATTRIBUTE_QUERY_TEXT);
Assert.assertEquals(queryText, queryString.toLowerCase().trim());
}
});
} catch(Exception e) {
LOG.error("Exception : ", e);
throw e;
}
}
protected AtlasObjectId toAtlasObjectId(Object obj) {
final AtlasObjectId ret;
if (obj instanceof AtlasObjectId) {
ret = (AtlasObjectId) obj;
} else if (obj instanceof Map) {
ret = new AtlasObjectId((Map) obj);
} else if (obj != null) {
ret = new AtlasObjectId(obj.toString()); // guid
} else {
ret = null;
}
return ret;
}
protected List<AtlasObjectId> toAtlasObjectIdList(Object obj) {
final List<AtlasObjectId> ret;
if (obj instanceof Collection) {
Collection coll = (Collection) obj;
ret = new ArrayList<>(coll.size());
for (Object item : coll) {
AtlasObjectId objId = toAtlasObjectId(item);
if (objId != null) {
ret.add(objId);
}
}
} else {
AtlasObjectId objId = toAtlasObjectId(obj);
if (objId != null) {
ret = new ArrayList<>(1);
ret.add(objId);
} else {
ret = null;
}
}
return ret;
}
protected String assertDatabaseIsRegistered(String dbName) throws Exception { protected String assertDatabaseIsRegistered(String dbName) throws Exception {
return assertDatabaseIsRegistered(dbName, null); return assertDatabaseIsRegistered(dbName, null);
} }
...@@ -227,6 +327,31 @@ public class ImpalaLineageITBase { ...@@ -227,6 +327,31 @@ public class ImpalaLineageITBase {
return dbName + "." + tableName; return dbName + "." + tableName;
} }
protected AtlasEntity validateProcess(String processQFName, String queryString) throws Exception {
String processId = assertProcessIsRegistered(processQFName, queryString);
AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity();
return processEntity;
}
protected AtlasEntity validateProcess(List<String> processQFNames, String queryString) throws Exception {
String processId = assertProcessIsRegistered(processQFNames, queryString);
AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity();
return processEntity;
}
protected AtlasEntity validateProcessExecution(AtlasEntity impalaProcess, String queryString) throws Exception {
String processExecutionId = assertProcessExecutionIsRegistered(impalaProcess, queryString);
AtlasEntity processExecutionEntity = atlasClientV2.getEntityByGuid(processExecutionId).getEntity();
return processExecutionEntity;
}
protected int numberOfProcessExecutions(AtlasEntity impalaProcess) {
return toAtlasObjectIdList(impalaProcess.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS)).size();
}
public interface AssertPredicate { public interface AssertPredicate {
void assertOnEntity(AtlasEntity entity) throws Exception; void assertOnEntity(AtlasEntity entity) throws Exception;
} }
......
...@@ -17,12 +17,17 @@ ...@@ -17,12 +17,17 @@
*/ */
package org.apache.atlas.impala; package org.apache.atlas.impala;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUERY_TEXT;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext; import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaLineageHook; import org.apache.atlas.impala.hook.ImpalaLineageHook;
import org.apache.atlas.impala.hook.events.BaseImpalaEvent; import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
import org.apache.atlas.impala.model.ImpalaQuery; import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.testng.Assert;
import org.testng.annotations.Test; import org.testng.annotations.Test;
public class ImpalaLineageToolIT extends ImpalaLineageITBase { public class ImpalaLineageToolIT extends ImpalaLineageITBase {
...@@ -73,8 +78,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -73,8 +78,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
processQFName = processQFName.toLowerCase(); processQFName = processQFName.toLowerCase();
assertProcessIsRegistered(processQFName, String queryString = "create view db_1.view_1 as select count, id from db_1.table_1";
"create view db_1.view_1 as select count, id from db_1.table_1"); AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryString);
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
} catch (Exception e) { } catch (Exception e) {
System.out.print("Appending file error"); System.out.print("Appending file error");
...@@ -136,8 +146,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -136,8 +146,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
// verify the process is saved in Atlas. the value is from info in IMPALA_4. // verify the process is saved in Atlas. the value is from info in IMPALA_4.
// There is no createTime in lineage record, so we don't know the process qualified name // There is no createTime in lineage record, so we don't know the process qualified name
// And can only verify the process is created for the given query. // And can only verify the process is created for the given query.
assertProcessIsRegistered(processQFNames,"create view " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName); String queryString = "create view " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName;
AtlasEntity processEntity1 = validateProcess(processQFNames, queryString);
AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryString);
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
} catch (Exception e) { } catch (Exception e) {
System.out.print("Appending file error"); System.out.print("Appending file error");
} }
...@@ -183,8 +198,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -183,8 +198,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
processQFName = processQFName.toLowerCase(); processQFName = processQFName.toLowerCase();
assertProcessIsRegistered(processQFName, String queryString = "create table " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName;
"create table " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName); AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryString);
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
} }
/** /**
...@@ -227,8 +247,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -227,8 +247,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
processQFName = processQFName.toLowerCase(); processQFName = processQFName.toLowerCase();
assertProcessIsRegistered(processQFName, String queryString = "alter view " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName;
"alter view " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName); AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryString);
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
} }
/** /**
...@@ -272,7 +297,79 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -272,7 +297,79 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2;
String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase(); String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase();
assertProcessIsRegistered(processQFName, String queryString = "insert into table " + dbName + "." + targetTableName + " (count, id) select count, id from " + dbName + "." + sourceTableName;
"insert into table " + dbName + "." + targetTableName + " (count, id) select count, id from " + dbName + "." + sourceTableName); AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryString);
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
}
/**
* This tests
* 1) ImpalaLineageTool can parse one lineage file that contains multiple "insert into" command lineages,
* there is table vertex with createTime.
* 2) Lineage is sent to Atlas
* 3) Atlas can get these lineages from Atlas
*/
@Test
public void testMultipleInsertIntoAsSelectFromFile() throws Exception {
String IMPALA = dir + "impalaMultipleInsertIntoAsSelect1.json";
String IMPALA_WAL = dir + "WALimpala.wal";
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
// create database and tables to simulate Impala behavior that Impala updates metadata
// to HMS and HMSHook sends the metadata to Atlas, which has to happen before
// Atlas can handle lineage notification
String dbName = "db_6";
createDatabase(dbName);
String sourceTableName = "table_1";
createTable(dbName, sourceTableName,"(id string, count int)", false);
String targetTableName = "table_2";
createTable(dbName, targetTableName,"(count int, id string, int_col int)", false);
// process lineage record, and send corresponding notification to Atlas
String[] args = new String[]{"-d", "./", "-p", "impala"};
ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
// re-run the same lineage record, should have the same process entity and another process execution entity
Thread.sleep(500);
IMPALA = dir + "impalaMultipleInsertIntoAsSelect2.json";
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
// verify the process is saved in Atlas
// the value is from info in IMPALA_4.
String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1;
String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2;
String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase();
String queryString = "insert into table " + dbName + "." + targetTableName + " (count, id) select count, id from " + dbName + "." + sourceTableName;
queryString = queryString.toLowerCase().trim();
String queryString2 = queryString;
AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
List<AtlasObjectId> processExecutions = toAtlasObjectIdList(processEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS));
Assert.assertEquals(processExecutions.size(), 2);
for (AtlasObjectId processExecutionId : processExecutions) {
AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.
getEntityByGuid(processExecutionId.getGuid());
AtlasEntity processExecutionEntity = atlasEntityWithExtInfo.getEntity();
String entityQueryText = String.valueOf(processExecutionEntity.getAttribute(ATTRIBUTE_QUERY_TEXT)).toLowerCase().trim();
if (!(queryString.equalsIgnoreCase(entityQueryText) || queryString2.equalsIgnoreCase(entityQueryText))) {
String errorMessage = String.format("process query text '%s' does not match expected value of '%s' or '%s'", entityQueryText, queryString, queryString2);
Assert.assertTrue(false, errorMessage);
}
}
} }
} }
\ No newline at end of file
...@@ -28,6 +28,9 @@ import org.apache.atlas.impala.model.ImpalaVertexType; ...@@ -28,6 +28,9 @@ import org.apache.atlas.impala.model.ImpalaVertexType;
import org.apache.atlas.impala.model.LineageEdge; import org.apache.atlas.impala.model.LineageEdge;
import org.apache.atlas.impala.model.ImpalaQuery; import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.impala.model.LineageVertex; import org.apache.atlas.impala.model.LineageVertex;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.testng.Assert;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterClass; import org.testng.annotations.AfterClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
...@@ -135,7 +138,12 @@ public class ImpalaLineageHookIT extends ImpalaLineageITBase { ...@@ -135,7 +138,12 @@ public class ImpalaLineageHookIT extends ImpalaLineageITBase {
processQFName = processQFName.toLowerCase(); processQFName = processQFName.toLowerCase();
assertProcessIsRegistered(processQFName, queryObj.getQueryText()); AtlasEntity processEntity1 = validateProcess(processQFName, queryObj.getQueryText());
AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryObj.getQueryText());
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("process create_view failed: ", ex); LOG.error("process create_view failed: ", ex);
assertFalse(true); assertFalse(true);
......
{
"queryText":"insert into table db_6.table_2 (count, id) select count, id from db_6.table_1",
"queryId":"3a441d0c130962f8:7f634aec00000000",
"hash":"64ff0425ccdfaada53e3f2fd76f566f7",
"user":"admin",
"timestamp":1554750072,
"endTime":1554750554,
"edges":[
{
"sources":[
1
],
"targets":[
0
],
"edgeType":"PROJECTION"
},
{
"sources":[
3
],
"targets":[
2
],
"edgeType":"PROJECTION"
},
{
"sources":[
],
"targets":[
4
],
"edgeType":"PROJECTION"
}
],
"vertices":[
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_6.table_2.count",
"metadata": {
"tableName": "db_6.table_2",
"tableCreateTime": 1554750072
}
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_6.table_1.count",
"metadata": {
"tableName": "db_6.table_1",
"tableCreateTime": 1554750070
}
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_6.table_2.id",
"metadata": {
"tableName": "db_6.table_2",
"tableCreateTime": 1554750072
}
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_6.table_1.id",
"metadata": {
"tableName": "db_6.table_1",
"tableCreateTime": 1554750070
}
},
{
"id":4,
"vertexType":"COLUMN",
"vertexId":"db_6.table_2.int_col",
"metadata": {
"tableName": "db_6.table_2",
"tableCreateTime": 1554750072
}
}
]
}
{
"queryText":"insert into table db_6.table_2 (count, id) select count, id from db_6.table_1",
"queryId":"3a441d0c130962f8:7f634aec00000000",
"hash":"64ff0425ccdfaada53e3f2fd76f566f7",
"user":"admin",
"timestamp":1554750082,
"endTime":1554750584,
"edges":[
{
"sources":[
1
],
"targets":[
0
],
"edgeType":"PROJECTION"
},
{
"sources":[
3
],
"targets":[
2
],
"edgeType":"PROJECTION"
},
{
"sources":[
],
"targets":[
4
],
"edgeType":"PROJECTION"
}
],
"vertices":[
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_6.table_2.count",
"metadata": {
"tableName": "db_6.table_2",
"tableCreateTime": 1554750072
}
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_6.table_1.count",
"metadata": {
"tableName": "db_6.table_1",
"tableCreateTime": 1554750070
}
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_6.table_2.id",
"metadata": {
"tableName": "db_6.table_2",
"tableCreateTime": 1554750072
}
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_6.table_1.id",
"metadata": {
"tableName": "db_6.table_1",
"tableCreateTime": 1554750070
}
},
{
"id":4,
"vertexType":"COLUMN",
"vertexId":"db_6.table_2.int_col",
"metadata": {
"tableName": "db_6.table_2",
"tableCreateTime": 1554750072
}
}
]
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment