Commit 1c399cc7 by lina.li Committed by Sarath Subramanian

ATLAS-3290: Impala Hook should get database name and table name from vertex metadata

parent 63779893
......@@ -23,7 +23,10 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.impala.model.LineageVertex;
import org.apache.atlas.impala.model.LineageVertexMetadata;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
/**
......@@ -101,6 +104,43 @@ public class AtlasImpalaHookContext {
getClusterName();
}
public String getQualifiedNameForColumn(LineageVertex vertex) {
// get database name and table name
LineageVertexMetadata metadata = vertex.getMetadata();
if (metadata == null) {
return getQualifiedNameForColumn(vertex.getVertexId());
}
String fullTableName = metadata.getTableName();
if (StringUtils.isEmpty(fullTableName)) {
throw new IllegalArgumentException("fullTableName in column metadata is null");
}
int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
if (!isSeparatorIndexValid(sepPos)) {
throw new IllegalArgumentException(fullTableName + "in column metadata does not contain database name");
}
// get pure column name
String columnName = vertex.getVertexId();
if (StringUtils.isEmpty(columnName)) {
throw new IllegalArgumentException("column name in vertexId is null");
}
int sepPosLast = columnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
if (isSeparatorIndexValid(sepPosLast)) {
columnName = columnName.substring(sepPosLast+1);
}
return getQualifiedNameForColumn(
fullTableName.substring(0, sepPos),
fullTableName.substring(sepPos+1),
columnName);
}
public String getQualifiedNameForColumn(String fullColumnName) throws IllegalArgumentException {
if (fullColumnName == null) {
throw new IllegalArgumentException("fullColumnName is null");
......
......@@ -124,7 +124,7 @@ public class ImpalaLineageHook extends AtlasHook {
} catch (Throwable t) {
LOG.error("ImpalaLineageHook.process(): failed to process query {}",
lineageQuery.getQueryText(), t);
AtlasType.toJson(lineageQuery), t);
}
if (LOG.isDebugEnabled()) {
......
......@@ -161,7 +161,7 @@ public abstract class BaseImpalaEvent {
return context.getQualifiedNameForTable(node.getVertexId());
case COLUMN:
return context.getQualifiedNameForColumn(node.getVertexId());
return context.getQualifiedNameForColumn(node);
default:
LOG.warn("null qualified name for type: {} and name: {}", nodeType, node.getVertexId());
......
......@@ -418,4 +418,60 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 0);
}
/**
* This tests
* 1) ImpalaLineageTool can parse one lineage file that contains "create table as select" command lineage,
* there is table vertex with createTime. The target vertex's vertexId does not contain db name and table name
* 2) Lineage is sent to Atlas
* 3) Atlas can get this lineage from Atlas
*/
@Test
public void testCreateTableAsSelectVertexIdNoTableNameFromFile() throws Exception {
String IMPALA = dir + "impalaCreateTableAsSelectVertexIdNoTableName.json";
String IMPALA_WAL = dir + "WALimpala.wal";
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
// create database and tables to simulate Impala behavior that Impala updates metadata
// to HMS and HMSHook sends the metadata to Atlas, which has to happen before
// Atlas can handle lineage notification
String dbName = "sales_db";
createDatabase(dbName);
String sourceTableName = "sales_asia";
createTable(dbName, sourceTableName,"(id string, name string)", false);
String targetTableName = "sales_china";
createTable(dbName, targetTableName,"(id string, name string)", false);
// process lineage record, and send corresponding notification to Atlas
String[] args = new String[]{"-d", "./", "-p", "impala"};
ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
// verify the process is saved in Atlas
// the value is from info in IMPALA_4.
String createTime = new Long((long)1560885039*1000).toString();
String processQFName =
dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase();
String queryString = "create table " + targetTableName + " as select * from " + sourceTableName;
AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryString);
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
String guid = assertTableIsRegistered(dbName, targetTableName);
AtlasEntity entity = atlasClientV2.getEntityByGuid(guid).getEntity();
List ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 1);
}
}
\ No newline at end of file
{
"queryText":"create table sales_china as select * from sales_asia",
"queryId":"2940d0b242de53ea:e82ba8d300000000",
"hash":"a705a9ec851a5440afca0dfb8df86cd5",
"user":"root",
"timestamp":1560885032,
"endTime":1560885040,
"edges":[
{
"sources":[
1
],
"targets":[
0
],
"edgeType":"PROJECTION"
},
{
"sources":[
3
],
"targets":[
2
],
"edgeType":"PROJECTION"
}
],
"vertices":[
{
"id":0,
"vertexType":"COLUMN",
"vertexId":"id",
"metadata":{
"tableName":"sales_db.sales_china",
"tableCreateTime":1560885039
}
},
{
"id":1,
"vertexType":"COLUMN",
"vertexId":"sales_db.sales_asia.id",
"metadata":{
"tableName":"sales_db.sales_asia",
"tableCreateTime":1560884919
}
},
{
"id":2,
"vertexType":"COLUMN",
"vertexId":"name",
"metadata":{
"tableName":"sales_db.sales_china",
"tableCreateTime":1560885039
}
},
{
"id":3,
"vertexType":"COLUMN",
"vertexId":"sales_db.sales_asia.name",
"metadata":{
"tableName":"sales_db.sales_asia",
"tableCreateTime":1560884919
}
}
]
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment