Commit 1ca6ee12 by lina.li Committed by Sarath Subramanian

ATLAS-3184: Add support of lineage integration for more Impala commands

parent 4e4038f6
......@@ -46,7 +46,6 @@ public class ImpalaLineageHook extends AtlasHook implements IImpalaLineageHook {
public static final String CONF_REALM_NAME = "atlas.realm.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
private ImpalaOperationParser parser = new ImpalaOperationParser();
private static final String clusterName;
private static final String realm;
private static final boolean convertHdfsPathToLowerCase;
......@@ -77,13 +76,16 @@ public class ImpalaLineageHook extends AtlasHook implements IImpalaLineageHook {
}
try {
ImpalaOperationType operationType = parser.getImpalaOperationType(lineageQuery.getQueryText());
ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(lineageQuery.getQueryText());
AtlasImpalaHookContext context =
new AtlasImpalaHookContext(this, operationType, lineageQuery);
BaseImpalaEvent event = null;
switch (operationType) {
case CREATEVIEW:
case CREATETABLE_AS_SELECT:
case ALTERVIEW_AS:
case QUERY:
event = new CreateImpalaProcess(context);
break;
default:
......
......@@ -19,6 +19,7 @@
package org.apache.atlas.impala.hook;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.commons.lang.StringUtils;
/**
* Parse an Impala query text and output the impala operation type
......@@ -28,12 +29,40 @@ public class ImpalaOperationParser {
public ImpalaOperationParser() {
}
public ImpalaOperationType getImpalaOperationType(String queryText) {
// TODO: more Impala commands will be handled in ATLAS-3184
if (queryText.toLowerCase().startsWith("create view")) {
public static ImpalaOperationType getImpalaOperationType(String queryText) {
// Impala does no generate lineage record for command "LOAD DATA INPATH"
if (StringUtils.startsWithIgnoreCase(queryText, "create view")) {
return ImpalaOperationType.CREATEVIEW;
} else if (StringUtils.startsWithIgnoreCase(queryText, "create table") &&
StringUtils.containsIgnoreCase(queryText, "as select")) {
return ImpalaOperationType.CREATETABLE_AS_SELECT;
} else if (StringUtils.startsWithIgnoreCase(queryText, "alter view") &&
StringUtils.containsIgnoreCase(queryText, "as select")) {
return ImpalaOperationType.ALTERVIEW_AS;
} else if (StringUtils.containsIgnoreCase(queryText, "insert into") &&
StringUtils.containsIgnoreCase(queryText, "select") &&
StringUtils.containsIgnoreCase(queryText, "from")) {
return ImpalaOperationType.QUERY;
} else if (StringUtils.containsIgnoreCase(queryText,"insert overwrite") &&
StringUtils.containsIgnoreCase(queryText, "select") &&
StringUtils.containsIgnoreCase(queryText, "from")) {
return ImpalaOperationType.QUERY;
}
return ImpalaOperationType.UNKNOWN;
}
public static ImpalaOperationType getImpalaOperationSubType(ImpalaOperationType operationType, String queryText) {
if (operationType == ImpalaOperationType.QUERY) {
if (StringUtils.containsIgnoreCase(queryText, "insert into")) {
return ImpalaOperationType.INSERT;
} else if (StringUtils.containsIgnoreCase(queryText, "insert overwrite")) {
return ImpalaOperationType.INSERT_OVERWRITE;
}
}
return ImpalaOperationType.UNKNOWN;
}
}
\ No newline at end of file
......@@ -21,12 +21,16 @@ package org.apache.atlas.impala.hook.events;
import static org.apache.atlas.impala.hook.AtlasImpalaHookContext.QNAME_SEP_PROCESS;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaOperationParser;
import org.apache.atlas.impala.model.ImpalaDataType;
import org.apache.atlas.impala.model.ImpalaNode;
import org.apache.atlas.impala.model.ImpalaOperationType;
......@@ -172,8 +176,9 @@ public abstract class BaseImpalaEvent {
protected String getQualifiedName(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
ImpalaOperationType operation = context.getImpalaOperationType();
// TODO: add more operation type here
if (operation == ImpalaOperationType.CREATEVIEW) {
if (operation == ImpalaOperationType.CREATEVIEW ||
operation == ImpalaOperationType.CREATETABLE_AS_SELECT ||
operation == ImpalaOperationType.ALTERVIEW_AS) {
List<? extends AtlasEntity> sortedEntities = new ArrayList<>(outputs);
Collections.sort(sortedEntities, entityComparator);
......@@ -187,8 +192,83 @@ public abstract class BaseImpalaEvent {
}
}
// TODO: add code for name construction for HDFS path
return null;
if (operation != ImpalaOperationType.QUERY) {
String errorMessage = String.format("Expect operation to be QUERY, but get unexpected operation type {}", operation.name());
LOG.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
// construct qualified name for QUERY
String qualifiedName = null;
String operationName = operation.toString();
if (operationName != null) {
StringBuilder sb = new StringBuilder(operationName);
addToProcessQualifiedName(sb, inputs, false);
sb.append("->");
addToProcessQualifiedName(sb, outputs, true);
qualifiedName = sb.toString();
}
return qualifiedName;
}
protected void addToProcessQualifiedName(StringBuilder processQualifiedName, Collection<? extends AtlasEntity> entities, boolean isOutput) {
if (entities == null) {
return;
}
ImpalaOperationType operation = context.getImpalaOperationType();
String queryText = context.getQueryStr();
List<? extends AtlasEntity> sortedEntities = new ArrayList<>(entities);
Collections.sort(sortedEntities, entityComparator);
Set<String> dataSetsProcessed = new HashSet<>();
for (AtlasEntity entity : sortedEntities) {
String qualifiedName = null;
long createTime = 0;
qualifiedName = (String)entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
if (entity.getTypeName().equalsIgnoreCase(HIVE_TYPE_TABLE)) {
Long createTimeObj = (Long)entity.getAttribute(ATTRIBUTE_CREATE_TIME);
if (createTimeObj != null) {
createTime = createTimeObj;
}
}
if (qualifiedName == null || !dataSetsProcessed.add(qualifiedName)) {
continue;
}
if (isOutput) {
boolean addWriteType = false;
ImpalaOperationType subType = ImpalaOperationParser.getImpalaOperationSubType(operation, queryText);
switch (subType) {
// Impala does not generate lineage for UPDATE and DELETE
case INSERT:
case INSERT_OVERWRITE:
addWriteType = true;
break;
}
if (addWriteType) {
processQualifiedName.append(QNAME_SEP_PROCESS).append(subType.name());
}
}
processQualifiedName.append(QNAME_SEP_PROCESS).append(qualifiedName.toLowerCase().replaceAll("/", ""));
if (createTime != 0) {
processQualifiedName.append(QNAME_SEP_PROCESS).append(createTime);
}
}
}
protected AtlasEntity getInputOutputEntity(ImpalaNode node, AtlasEntityExtInfo entityExtInfo) throws Exception {
......@@ -419,8 +499,8 @@ public abstract class BaseImpalaEvent {
}
protected AtlasEntity getImpalaProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception {
AtlasEntity ret = new AtlasEntity(ImpalaDataType.IMPALA_PROCESS.getName());
String queryStr = context.getQueryStr();
AtlasEntity ret = new AtlasEntity(ImpalaDataType.IMPALA_PROCESS.getName());
String queryStr = context.getQueryStr();
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
......
......@@ -18,7 +18,17 @@
package org.apache.atlas.impala.model;
public enum ImpalaOperationType{
// main operation type
CREATEVIEW ("CREATEVIEW"),
CREATETABLE_AS_SELECT ("CREATETABLE_AS_SELECT"),
ALTERVIEW_AS ("ALTERVIEW_AS"),
QUERY ("QUERY"),
// sub operation type, which is associated with output
INSERT ("INSERT"),
INSERT_OVERWRITE ("INSERT_OVERWRITE"),
// default type
UNKNOWN ("UNKNOWN");
private final String name;
......
......@@ -19,13 +19,14 @@ package org.apache.atlas.impala;
import java.util.ArrayList;
import java.util.List;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaLineageHook;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.testng.annotations.Test;
public class ImpalaLineageToolIT extends ImpalaLineageITBase {
public static final long TABLE_CREATE_TIME_SOURCE = 1554750070;
public static final long TABLE_CREATE_TIME = 1554750072;
private static String dir = System.getProperty("user.dir") + "/src/test/resources/";
private static String IMPALA = dir + "impala3.json";
private static String IMPALA_WAL = dir + "WALimpala.wal";
......@@ -75,4 +76,137 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
System.out.print("Appending file error");
}
}
}
/**
* This tests
* 1) ImpalaLineageTool can parse one lineage file that contains "create table as select" command lineage,
* there is table vertex with createTime.
* 2) Lineage is sent to Atlas
* 3) Atlas can get this lineage from Atlas
*/
@Test
public void testCreateTableAsSelectFromFile() throws Exception {
String IMPALA = dir + "impala5.json";
String IMPALA_WAL = dir + "WALimpala.wal";
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
// create database and tables to simulate Impala behavior that Impala updates metadata
// to HMS and HMSHook sends the metadata to Atlas, which has to happen before
// Atlas can handle lineage notification
String dbName = "db_3";
createDatabase(dbName);
String sourceTableName = "table_1";
createTable(dbName, sourceTableName,"(id string, count int)", false);
String targetTableName = "table_2";
createTable(dbName, targetTableName,"(count int, id string)", false);
// process lineage record, and send corresponding notification to Atlas
String[] args = new String[]{"-d", "./", "-p", "impala"};
ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
// verify the process is saved in Atlas
// the value is from info in IMPALA_4.
String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
String processQFName =
dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase();
assertProcessIsRegistered(processQFName,
"create table " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName);
}
/**
* This tests
* 1) ImpalaLineageTool can parse one lineage file that contains "alter view as select" command lineage,
* there is table vertex with createTime.
* 2) Lineage is sent to Atlas
* 3) Atlas can get this lineage from Atlas
*/
@Test
public void testAlterViewAsSelectFromFile() throws Exception {
String IMPALA = dir + "impala6.json";
String IMPALA_WAL = dir + "WALimpala.wal";
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
// create database and tables to simulate Impala behavior that Impala updates metadata
// to HMS and HMSHook sends the metadata to Atlas, which has to happen before
// Atlas can handle lineage notification
String dbName = "db_4";
createDatabase(dbName);
String sourceTableName = "table_1";
createTable(dbName, sourceTableName,"(id string, count int)", false);
String targetTableName = "view_1";
createTable(dbName, targetTableName,"(count int, id string)", false);
// process lineage record, and send corresponding notification to Atlas
String[] args = new String[]{"-d", "./", "-p", "impala"};
ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
// verify the process is saved in Atlas
// the value is from info in IMPALA_4.
String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
String processQFName =
dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase();
assertProcessIsRegistered(processQFName,
"alter view " + dbName + "." + targetTableName + " as select count, id from " + dbName + "." + sourceTableName);
}
/**
* This tests
* 1) ImpalaLineageTool can parse one lineage file that contains "insert into" command lineage,
* there is table vertex with createTime.
* 2) Lineage is sent to Atlas
* 3) Atlas can get this lineage from Atlas
*/
@Test
public void testInsertIntoAsSelectFromFile() throws Exception {
String IMPALA = dir + "impala7.json";
String IMPALA_WAL = dir + "WALimpala.wal";
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
// create database and tables to simulate Impala behavior that Impala updates metadata
// to HMS and HMSHook sends the metadata to Atlas, which has to happen before
// Atlas can handle lineage notification
String dbName = "db_5";
createDatabase(dbName);
String sourceTableName = "table_1";
createTable(dbName, sourceTableName,"(id string, count int)", false);
String targetTableName = "table_2";
createTable(dbName, targetTableName,"(count int, id string, int_col int)", false);
// process lineage record, and send corresponding notification to Atlas
String[] args = new String[]{"-d", "./", "-p", "impala"};
ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
// verify the process is saved in Atlas
// the value is from info in IMPALA_4.
String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1;
String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2;
String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase();
assertProcessIsRegistered(processQFName,
"insert into table " + dbName + "." + targetTableName + " (count, id) select count, id from " + dbName + "." + sourceTableName);
}
}
\ No newline at end of file
{
"queryText":"create table db_3.table_2 as select count, id from db_3.table_1",
"queryId":"3a441d0c130962f8:7f634aec00000000",
"hash":"64ff0425ccdfaada53e3f2fd76f566f7",
"user":"admin",
"timestamp":1554750072,
"endTime":1554750554,
"edges":[
{
"sources":[
1
],
"targets":[
0
],
"edgeType":"PROJECTION"
},
{
"sources":[
3
],
"targets":[
2
],
"edgeType":"PROJECTION"
}
],
"vertices":[
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_3.table_2.count"
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_3.table_1.count"
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_3.table_2.id"
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_3.table_1.id"
},
{
"id":4,
"vertexType":"TABLE",
"vertexId":"db_3.table_1",
"createTime":1554750070
},
{
"id":5,
"vertexType":"TABLE",
"vertexId":"db_3.table_2",
"createTime":1554750072
}
]
}
\ No newline at end of file
{
"queryText":"alter view db_4.view_1 as select count, id from db_4.table_1",
"queryId":"3a441d0c130962f8:7f634aec00000000",
"hash":"64ff0425ccdfaada53e3f2fd76f566f7",
"user":"admin",
"timestamp":1554750072,
"endTime":1554750554,
"edges":[
{
"sources":[
1
],
"targets":[
0
],
"edgeType":"PROJECTION"
},
{
"sources":[
3
],
"targets":[
2
],
"edgeType":"PROJECTION"
}
],
"vertices":[
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_4.view_1.count"
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_4.table_1.count"
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_4.view_1.id"
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_4.table_1.id"
},
{
"id":4,
"vertexType":"TABLE",
"vertexId":"db_4.table_1",
"createTime":1554750070
},
{
"id":5,
"vertexType":"TABLE",
"vertexId":"db_4.view_1",
"createTime":1554750072
}
]
}
\ No newline at end of file
{
"queryText":"insert into table db_5.table_2 (count, id) select count, id from db_5.table_1",
"queryId":"3a441d0c130962f8:7f634aec00000000",
"hash":"64ff0425ccdfaada53e3f2fd76f566f7",
"user":"admin",
"timestamp":1554750072,
"endTime":1554750554,
"edges":[
{
"sources":[
1
],
"targets":[
0
],
"edgeType":"PROJECTION"
},
{
"sources":[
3
],
"targets":[
2
],
"edgeType":"PROJECTION"
},
{
"sources":[
],
"targets":[
6
],
"edgeType":"PROJECTION"
}
],
"vertices":[
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"db_5.table_2.count"
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"db_5.table_1.count"
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"db_5.table_2.id"
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"db_5.table_1.id"
},
{
"id":4,
"vertexType":"TABLE",
"vertexId":"db_5.table_1",
"createTime":1554750070
},
{
"id":5,
"vertexType":"TABLE",
"vertexId":"db_5.table_2",
"createTime":1554750072
},
{
"id":6,
"vertexType":"COLUMN",
"vertexId":"db_5.table_2.int_col"
}
]
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment