Commit f62ed092 by rmani Committed by Madhan Neethiraj

ATLAS-2649: updated Hive Hook to create lineage between HBase table and Hive table

parent f15995cc
......@@ -295,6 +295,11 @@
<version>${hbase.version}</version>
</artifactItem>
<artifactItem>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
......
......@@ -28,6 +28,7 @@ import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Database;
......@@ -75,6 +76,8 @@ public abstract class BaseHiveEvent {
public static final String HIVE_TYPE_SERDE = "hive_serde";
public static final String HIVE_TYPE_ORDER = "hive_order";
public static final String HDFS_TYPE_PATH = "hdfs_path";
public static final String HBASE_TYPE_TABLE = "hbase_table";
public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace";
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
......@@ -124,9 +127,15 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_DEPENDENCY_TYPE = "depenendencyType";
public static final String ATTRIBUTE_EXPRESSION = "expression";
public static final String ATTRIBUTE_ALIASES = "aliases";
public static final String ATTRIBUTE_URI = "uri";
public static final String ATTRIBUTE_STORAGE_HANDLER = "storage_handler";
public static final String ATTRIBUTE_NAMESPACE = "namespace";
public static final long MILLIS_CONVERT_FACTOR = 1000;
public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
public static final String HBASE_NAMESPACE_TABLE_DELIMITER = ":";
public static final String HBASE_PARAM_TABLE_NAME = "hbase.table.name";
public static final long MILLIS_CONVERT_FACTOR = 1000;
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
......@@ -156,6 +165,10 @@ public abstract class BaseHiveEvent {
return table.getTTable() != null ? (table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR) : System.currentTimeMillis();
}
public static String getTableOwner(Table table) {
return table.getTTable() != null ? (table.getOwner()): "";
}
public static AtlasObjectId getObjectId(AtlasEntity entity) {
String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
......@@ -694,6 +707,53 @@ public abstract class BaseHiveEvent {
return sb.toString();
}
protected AtlasEntity toReferencedHBaseTable(Table table, AtlasEntitiesWithExtInfo entities) {
AtlasEntity ret = null;
HBaseTableInfo hBaseTableInfo = new HBaseTableInfo(table);
String hbaseNameSpace = hBaseTableInfo.getHbaseNameSpace();
String hbaseTableName = hBaseTableInfo.getHbaseTableName();
if (hbaseTableName != null) {
AtlasEntity nsEntity = new AtlasEntity(HBASE_TYPE_NAMESPACE);
nsEntity.setAttribute(ATTRIBUTE_NAME, hbaseNameSpace);
nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseNameSpaceQualifiedName(getClusterName(), hbaseNameSpace));
ret = new AtlasEntity(HBASE_TYPE_TABLE);
ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName);
ret.setAttribute(ATTRIBUTE_URI, hbaseTableName);
ret.setAttribute(ATTRIBUTE_NAMESPACE, getObjectId(nsEntity));
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName));
entities.addReferredEntity(nsEntity);
entities.addEntity(ret);
}
return ret;
}
protected boolean isHBaseStore(Table table) {
boolean ret = false;
Map<String, String> parameters = table.getParameters();
if (MapUtils.isNotEmpty(parameters)) {
String storageHandler = parameters.get(ATTRIBUTE_STORAGE_HANDLER);
ret = (storageHandler != null && storageHandler.equals(HBASE_STORAGE_HANDLER_CLASS));
}
return ret;
}
private static String getHBaseTableQualifiedName(String clusterName, String nameSpace, String tableName) {
return String.format("%s:%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName);
}
private static String getHBaseNameSpaceQualifiedName(String clusterName, String nameSpace) {
return String.format("%s@%s", nameSpace.toLowerCase(), clusterName);
}
private boolean ignoreHDFSPathsinProcessQualifiedName() {
switch (context.getHiveOperation()) {
case LOAD:
......@@ -831,4 +891,37 @@ public abstract class BaseHiveEvent {
}
static final Comparator<Entity> entityComparator = new EntityComparator();
static final class HBaseTableInfo {
String hbaseNameSpace = null;
String hbaseTableName = null;
HBaseTableInfo(Table table) {
Map<String, String> parameters = table.getParameters();
if (MapUtils.isNotEmpty(parameters)) {
hbaseNameSpace = HBASE_DEFAULT_NAMESPACE;
hbaseTableName = parameters.get(HBASE_PARAM_TABLE_NAME);
if (hbaseTableName != null) {
if (hbaseTableName.contains(HBASE_NAMESPACE_TABLE_DELIMITER)) {
String[] hbaseTableInfo = hbaseTableName.split(HBASE_NAMESPACE_TABLE_DELIMITER);
if (hbaseTableInfo.length > 1) {
hbaseNameSpace = hbaseTableInfo[0];
hbaseTableName = hbaseTableInfo[1];
}
}
}
}
}
public String getHbaseNameSpace() {
return hbaseNameSpace;
}
public String getHbaseTableName() {
return hbaseTableName;
}
}
}
......@@ -81,12 +81,29 @@ public class CreateTable extends BaseHiveEvent {
if (table != null) {
AtlasEntity tblEntity = toTableEntity(table, ret);
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation());
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
if (isHBaseStore(table)) {
// This create lineage to HBase table in case of Hive on HBase
AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret);
ret.addEntity(processEntity);
ret.addReferredEntity(hdfsPathEntity);
if (hbaseTableEntity != null) {
final AtlasEntity processEntity;
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity));
} else {
processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity));
}
ret.addEntity(processEntity);
}
} else {
if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation());
AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity));
ret.addEntity(processEntity);
ret.addReferredEntity(hdfsPathEntity);
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment