Commit 2f461b42 by lina.li Committed by Sarath Subramanian

ATLAS-3226: Add QueryText for hive_table and hive_db for Impala integration

parent e4f0b09e
......@@ -148,6 +148,7 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_HOSTNAME = "hostName";
public static final String ATTRIBUTE_EXEC_TIME = "execTime";
public static final String ATTRIBUTE_DDL_QUERIES = "ddlQueries";
public static final String ATTRIBUTE_SERVICE_TYPE = "serviceType";
public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
......@@ -688,6 +689,7 @@ public abstract class BaseHiveEvent {
}
if (hiveDDL != null) {
hiveDDL.setAttribute(ATTRIBUTE_SERVICE_TYPE, "hive");
hiveDDL.setAttribute(ATTRIBUTE_EXEC_TIME, getQueryStartTime());
hiveDDL.setAttribute(ATTRIBUTE_QUERY_TEXT, getQueryString());
hiveDDL.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
......
......@@ -47,6 +47,7 @@ import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -88,11 +89,11 @@ public abstract class BaseImpalaEvent {
public static final String ATTRIBUTE_DEPENDENCY_TYPE = "dependencyType";
public static final String ATTRIBUTE_HOSTNAME = "hostName";
public static final String EMPTY_ATTRIBUTE_VALUE = "";
public static final String ATTRIBUTE_EXEC_TIME = "execTime";
public static final String ATTRIBUTE_DDL_QUERIES = "ddlQueries";
public static final String ATTRIBUTE_SERVICE_TYPE = "serviceType";
public static final long MILLIS_CONVERT_FACTOR = 1000;
protected final AtlasImpalaHookContext context;
protected final Map<String, ImpalaNode> vertexNameMap;
protected final Map<Long, LineageVertex> verticesMap;
......@@ -555,8 +556,8 @@ public abstract class BaseImpalaEvent {
queryStr = queryStr.toLowerCase().trim();
}
Long startTime = context.getLineageQuery().getTimestamp() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
Long endTime = context.getLineageQuery().getEndTime() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
Long startTime = getQueryStartTime();
Long endTime = getQueryEndTime();
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString() +
QNAME_SEP_PROCESS + startTime.toString() +
......@@ -574,6 +575,14 @@ public abstract class BaseImpalaEvent {
return ret;
}
protected Long getQueryStartTime() {
return context.getLineageQuery().getTimestamp() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
}
protected Long getQueryEndTime() {
return context.getLineageQuery().getEndTime() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
}
protected void addProcessedEntities(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
for (AtlasEntity entity : context.getEntities()) {
entitiesWithExtInfo.addReferredEntity(entity);
......@@ -614,4 +623,40 @@ public abstract class BaseImpalaEvent {
tableVertex.setCreateTime(createTime);
return new ImpalaNode(tableVertex);
}
protected AtlasEntity createHiveDDLEntity(AtlasEntity dbOrTable) {
return createHiveDDLEntity(dbOrTable, true);
}
protected AtlasEntity createHiveDDLEntity(AtlasEntity dbOrTable, boolean excludeEntityGuid) {
AtlasObjectId objId = BaseImpalaEvent.getObjectId(dbOrTable);
AtlasEntity hiveDDL = null;
if (excludeEntityGuid) {
objId.setGuid(null);
}
if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_DB)) {
hiveDDL = new AtlasEntity(ImpalaDataType.HIVE_DB_DDL.getName(), ATTRIBUTE_DB, objId);
} else if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_TABLE)) {
hiveDDL = new AtlasEntity(ImpalaDataType.HIVE_TABLE_DDL.getName(), ATTRIBUTE_TABLE, objId);
}
if (hiveDDL != null) {
hiveDDL.setAttribute(ATTRIBUTE_SERVICE_TYPE, "impala");
hiveDDL.setAttribute(ATTRIBUTE_EXEC_TIME, getQueryStartTime());
hiveDDL.setAttribute(ATTRIBUTE_QUERY_TEXT, context.getQueryStr());
hiveDDL.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
hiveDDL.setAttribute(ATTRIBUTE_NAME, context.getQueryStr() + QNAME_SEP_PROCESS + getQueryStartTime().toString());
hiveDDL.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveDDL.getAttribute(ATTRIBUTE_NAME));
}
return hiveDDL;
}
protected boolean isDdlOperation() {
return (context.getImpalaOperationType().equals(ImpalaOperationType.CREATEVIEW)
|| context.getImpalaOperationType().equals(ImpalaOperationType.ALTERVIEW_AS)
|| context.getImpalaOperationType().equals(ImpalaOperationType.CREATETABLE_AS_SELECT));
}
}
......@@ -104,6 +104,13 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
if (entity != null) {
outputs.add(entity);
if (isDdlOperation()) {
AtlasEntity ddlEntity = createHiveDDLEntity(entity);
if (ddlEntity != null) {
ret.addEntity(ddlEntity);
}
}
}
}
}
......
......@@ -24,7 +24,9 @@ public enum ImpalaDataType {
IMPALA_PROCESS,
IMPALA_PROCESS_EXECUTION,
IMPALA_COLUMN_LINEAGE;
IMPALA_COLUMN_LINEAGE,
HIVE_DB_DDL,
HIVE_TABLE_DDL;
public String getName() {
return name().toLowerCase();
......
......@@ -22,6 +22,7 @@ import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUAL
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUERY_TEXT;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_RECENT_QUERIES;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.HIVE_TYPE_DB;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.HIVE_TYPE_TABLE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
......@@ -300,6 +301,32 @@ public class ImpalaLineageITBase {
return assertEntityIsRegistered(HIVE_TYPE_DB, REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName, assertPredicate);
}
protected String assertTableIsRegistered(String dbName, String tableName) throws Exception {
return assertTableIsRegistered(dbName, tableName, null, false);
}
protected String assertTableIsRegistered(String fullTableName) throws Exception {
return assertTableIsRegistered(fullTableName, null, false);
}
protected String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String fullTableName = dbName + AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + tableName;
return assertTableIsRegistered(fullTableName, assertPredicate, isTemporary);
}
protected String assertTableIsRegistered(String fullTableName, AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
LOG.debug("Searching for table {}", fullTableName);
String tableQualifiedName = (fullTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME).toLowerCase() +
CLUSTER_NAME;
return assertEntityIsRegistered(HIVE_TYPE_TABLE, REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
assertPredicate);
}
protected String createDatabase() throws Exception {
String dbName = dbName();
......
......@@ -30,6 +30,10 @@ import org.apache.atlas.model.instance.AtlasObjectId;
import org.testng.Assert;
import org.testng.annotations.Test;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_DDL_QUERIES;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class ImpalaLineageToolIT extends ImpalaLineageITBase {
public static final long TABLE_CREATE_TIME_SOURCE = 1554750070;
public static final long TABLE_CREATE_TIME = 1554750072;
......@@ -78,14 +82,20 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
processQFName = processQFName.toLowerCase();
String queryString = "create view db_1.view_1 as select count, id from db_1.table_1";
AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
String queryString = "create view db_1.view_1 as select count, id from db_1.table_1";
AtlasEntity processEntity1 = validateProcess(processQFName, queryString);
AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryString);
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
String guid = assertTableIsRegistered(dbName, targetTableName);
AtlasEntity entity = atlasClientV2.getEntityByGuid(guid).getEntity();
List ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 1);
} catch (Exception e) {
System.out.print("Appending file error");
}
......@@ -103,10 +113,10 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
// this file contains a single lineage record for "create view".
// there is no table vertex with createTime, which is lineage record generated by Impala
// originally. The table create time is hard-coded before Impala fixes this issue.
String IMPALA = dir + "impalaCreateViewNoCreateTime.json";
String IMPALA = dir + "impalaCreateViewNoCreateTime.json";
String IMPALA_WAL = dir + "WALimpala.wal";
List<ImpalaQuery> lineageList = new ArrayList<>();
List<ImpalaQuery> lineageList = new ArrayList<>();
ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
try {
......@@ -153,6 +163,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
String guid = assertTableIsRegistered(dbName, targetTableName);
AtlasEntity entity = atlasClientV2.getEntityByGuid(guid).getEntity();
List ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 1);
} catch (Exception e) {
System.out.print("Appending file error");
}
......@@ -205,6 +222,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
String guid = assertTableIsRegistered(dbName, targetTableName);
AtlasEntity entity = atlasClientV2.getEntityByGuid(guid).getEntity();
List ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 1);
}
/**
......@@ -254,6 +278,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
String guid = assertTableIsRegistered(dbName, targetTableName);
AtlasEntity entity = atlasClientV2.getEntityByGuid(guid).getEntity();
List ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 1);
}
/**
......@@ -304,6 +335,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
String guid = assertTableIsRegistered(dbName, targetTableName);
AtlasEntity entity = atlasClientV2.getEntityByGuid(guid).getEntity();
List ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 0);
}
/**
......@@ -341,6 +379,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
Thread.sleep(500);
IMPALA = dir + "impalaMultipleInsertIntoAsSelect2.json";
toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, IMPALA_WAL);
Thread.sleep(300);
// verify the process is saved in Atlas
// the value is from info in IMPALA_4.
......@@ -371,5 +410,12 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
Assert.assertTrue(false, errorMessage);
}
}
String guid = assertTableIsRegistered(dbName, targetTableName);
AtlasEntity entity = atlasClientV2.getEntityByGuid(guid).getEntity();
List ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 0);
}
}
\ No newline at end of file
......@@ -34,9 +34,13 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_DDL_QUERIES;
import static org.testng.Assert.assertFalse;
public class ImpalaLineageHookIT extends ImpalaLineageITBase {
......@@ -138,19 +142,24 @@ public class ImpalaLineageHookIT extends ImpalaLineageITBase {
processQFName = processQFName.toLowerCase();
// check process and process execution entities
AtlasEntity processEntity1 = validateProcess(processQFName, queryObj.getQueryText());
AtlasEntity processExecutionEntity1 = validateProcessExecution(processEntity1, queryObj.getQueryText());
AtlasObjectId process1 = toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
BaseImpalaEvent.ATTRIBUTE_PROCESS));
Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
// check DDL entity
String viewId = assertTableIsRegistered(viewName);
AtlasEntity entity = atlasClientV2.getEntityByGuid(viewId).getEntity();
List ddlQueries = (List) entity.getRelationshipAttribute(ATTRIBUTE_DDL_QUERIES);
assertNotNull(ddlQueries);
assertEquals(ddlQueries.size(), 1);
} catch (Exception ex) {
LOG.error("process create_view failed: ", ex);
assertFalse(true);
}
}
}
......@@ -61,6 +61,48 @@
]
},
{
"name": "ddl",
"superTypes": [
"Referenceable"
],
"serviceType": "atlas_core",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "queryText",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "execTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "userName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "serviceType",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
}
]
},
{
"name": "DataSet",
"superTypes": [
"Asset"
......
......@@ -533,43 +533,9 @@
]
},
{
"name": "hive_ddl",
"superTypes": [
"Referenceable"
],
"serviceType": "hive",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "queryText",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
},
{
"name": "execTime",
"typeName": "date",
"cardinality": "SINGLE",
"isIndexable": false,
"isOptional": false,
"isUnique": false
},
{
"name": "userName",
"typeName": "string",
"cardinality": "SINGLE",
"isIndexable": true,
"isOptional": false,
"isUnique": false
}
]
},
{
"name": "hive_db_ddl",
"superTypes": [
"hive_ddl"
"ddl"
],
"serviceType": "hive",
"typeVersion": "1.0",
......@@ -578,7 +544,7 @@
{
"name": "hive_table_ddl",
"superTypes": [
"hive_ddl"
"ddl"
],
"serviceType": "hive",
"typeVersion": "1.0",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment