Commit 46365f8c by Suma Shivaprasad

ATLAS-527 Support lineage for load table, import, export (sumasai via shwethags)

parent 009330de
......@@ -17,6 +17,9 @@
*/
package org.apache.atlas.fs.model;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.addons.ModelDefinitionDump;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
......
......@@ -31,13 +31,10 @@ import scala.tools.scalap.scalax.rules.scalasig.ClassFileParser.EnumConstValue
*/
object FSDataModel extends App {
var typesDef : TypesDef = null
val typesBuilder = new TypesBuilder
import typesBuilder._
typesDef = types {
val typesDef : TypesDef = types {
// FS DataSet
_class(FSDataTypes.FS_PATH.toString, List("DataSet", AtlasClient.REFERENCEABLE_SUPER_TYPE)) {
......
......@@ -18,18 +18,22 @@
package org.apache.atlas.hive.bridge;
import com.google.common.base.Joiner;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.fs.model.FSDataModel;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
......@@ -67,6 +71,9 @@ public class HiveMetaStoreBridge {
public static final String ATLAS_ENDPOINT = "atlas.rest.address";
private final String doAsUser;
private final UserGroupInformation ugi;
private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class);
public final Hive hiveClient;
......@@ -82,6 +89,11 @@ public class HiveMetaStoreBridge {
this(hiveConf, atlasConf, null, null);
}
@VisibleForTesting
HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
this(clusterName, hiveClient, atlasClient, null, null);
}
public String getClusterName() {
return clusterName;
}
......@@ -96,21 +108,16 @@ public class HiveMetaStoreBridge {
UserGroupInformation ugi) throws Exception {
this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME),
Hive.get(hiveConf),
atlasConf, doAsUser, ugi);
}
HiveMetaStoreBridge(String clusterName, Hive hiveClient,
Configuration atlasConf, String doAsUser, UserGroupInformation ugi) {
this.clusterName = clusterName;
this.hiveClient = hiveClient;
String baseUrls = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
this.atlasClient = new AtlasClient(ugi, doAsUser, baseUrls.split(","));
new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser), doAsUser, ugi);
}
HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
@VisibleForTesting
HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient, String user, UserGroupInformation ugi) {
this.clusterName = clusterName;
this.hiveClient = hiveClient;
this.atlasClient = atlasClient;
this.doAsUser = user;
this.ugi = ugi;
}
private AtlasClient getAtlasClient() {
......@@ -306,7 +313,7 @@ public class HiveMetaStoreBridge {
}
private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference,
Table hiveTable) throws Exception {
final Table hiveTable) throws Exception {
LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
if (tableReference == null) {
......@@ -348,6 +355,7 @@ public class HiveMetaStoreBridge {
tableReference.set(TABLE_TYPE_ATTR, hiveTable.getTableType().name());
tableReference.set("temporary", hiveTable.isTemporary());
return tableReference;
}
......@@ -453,6 +461,17 @@ public class HiveMetaStoreBridge {
return sdReferenceable;
}
public Referenceable fillHDFSDataSet(String pathUri) {
Referenceable ref = new Referenceable(FSDataTypes.HDFS_PATH().toString());
ref.set("path", pathUri);
// Path path = new Path(pathUri);
// ref.set("name", path.getName());
// TODO - Fix after ATLAS-542 to shorter Name
ref.set("name", pathUri);
ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri);
return ref;
}
public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
final String[] parts = tableQualifiedName.split("@");
final String tableName = parts[0];
......@@ -488,6 +507,21 @@ public class HiveMetaStoreBridge {
AtlasClient dgiClient = getAtlasClient();
try {
dgiClient.getType(FSDataTypes.HDFS_PATH().toString());
LOG.info("HDFS data model is already registered!");
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
//Trigger val definition
FSDataModel.main(null);
final String hdfsModelJson = TypesSerialization.toJson(FSDataModel.typesDef());
//Expected in case types do not exist
LOG.info("Registering HDFS data model : " + hdfsModelJson);
dgiClient.createType(hdfsModelJson);
}
}
try {
dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
LOG.info("Hive data model is already registered!");
} catch(AtlasServiceException ase) {
......
......@@ -27,7 +27,10 @@ import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
......@@ -38,18 +41,20 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.security.UserGroupInformation;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -86,18 +91,108 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
class HiveEvent {
public Set<ReadEntity> inputs;
public Set<WriteEntity> outputs;
static class HiveEventContext {
private Set<ReadEntity> inputs;
private Set<WriteEntity> outputs;
private String user;
private UserGroupInformation ugi;
private HiveOperation operation;
private HookContext.HookType hookType;
private org.json.JSONObject jsonPlan;
private String queryId;
private String queryStr;
private Long queryStartTime;
private String queryType;
public void setInputs(Set<ReadEntity> inputs) {
this.inputs = inputs;
}
public void setOutputs(Set<WriteEntity> outputs) {
this.outputs = outputs;
}
public void setUser(String user) {
this.user = user;
}
public void setUgi(UserGroupInformation ugi) {
this.ugi = ugi;
}
public void setOperation(HiveOperation operation) {
this.operation = operation;
}
public void setHookType(HookContext.HookType hookType) {
this.hookType = hookType;
}
public void setJsonPlan(JSONObject jsonPlan) {
this.jsonPlan = jsonPlan;
}
public void setQueryId(String queryId) {
this.queryId = queryId;
}
public void setQueryStr(String queryStr) {
this.queryStr = queryStr;
}
public void setQueryStartTime(Long queryStartTime) {
this.queryStartTime = queryStartTime;
}
public void setQueryType(String queryType) {
this.queryType = queryType;
}
public Set<ReadEntity> getInputs() {
return inputs;
}
public Set<WriteEntity> getOutputs() {
return outputs;
}
public String getUser() {
return user;
}
public UserGroupInformation getUgi() {
return ugi;
}
public HiveOperation getOperation() {
return operation;
}
public HookContext.HookType getHookType() {
return hookType;
}
public org.json.JSONObject getJsonPlan() {
return jsonPlan;
}
public String getQueryId() {
return queryId;
}
public String getQueryStr() {
return queryStr;
}
public Long getQueryStartTime() {
return queryStartTime;
}
public String user;
public UserGroupInformation ugi;
public HiveOperation operation;
public HookContext.HookType hookType;
public JSONObject jsonPlan;
public String queryId;
public String queryStr;
public Long queryStartTime;
public String getQueryType() {
return queryType;
}
}
private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
......@@ -156,21 +251,21 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
@Override
public void run(final HookContext hookContext) throws Exception {
// clone to avoid concurrent access
final HiveEvent event = new HiveEvent();
final HiveConf conf = new HiveConf(hookContext.getConf());
event.inputs = hookContext.getInputs();
event.outputs = hookContext.getOutputs();
event.user = getUser(hookContext.getUserName(), hookContext.getUgi());
event.ugi = hookContext.getUgi();
event.operation = OPERATION_MAP.get(hookContext.getOperationName());
event.hookType = hookContext.getHookType();
event.queryId = hookContext.getQueryPlan().getQueryId();
event.queryStr = hookContext.getQueryPlan().getQueryStr();
event.queryStartTime = hookContext.getQueryPlan().getQueryStartTime();
final HiveConf conf = new HiveConf(hookContext.getConf());
event.jsonPlan = getQueryPlan(hookContext.getConf(), hookContext.getQueryPlan());
final HiveEventContext event = new HiveEventContext();
event.setInputs(hookContext.getInputs());
event.setOutputs(hookContext.getOutputs());
event.setJsonPlan(getQueryPlan(hookContext.getConf(), hookContext.getQueryPlan()));
event.setHookType(hookContext.getHookType());
event.setUgi(hookContext.getUgi());
event.setUser(hookContext.getUserName());
event.setOperation(OPERATION_MAP.get(hookContext.getOperationName()));
event.setQueryId(hookContext.getQueryPlan().getQueryId());
event.setQueryStr(hookContext.getQueryPlan().getQueryStr());
event.setQueryStartTime(hookContext.getQueryPlan().getQueryStartTime());
event.setQueryType(hookContext.getQueryPlan().getQueryPlan().getQueryType());
boolean sync = conf.get(CONF_SYNC, "false").equals("true");
if (sync) {
......@@ -189,20 +284,21 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
}
private void fireAndForget(HiveEvent event) throws Exception {
assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
private void fireAndForget(HiveEventContext event) throws Exception {
assert event.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
LOG.info("Entered Atlas hook for hook type {} operation {}", event.hookType, event.operation);
LOG.info("Entered Atlas hook for hook type {} operation {}", event.getHookType(), event.getOperation());
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf, atlasProperties, event.user, event.ugi);
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf, atlasProperties, event.getUser(), event.getUgi());
switch (event.operation) {
switch (event.getOperation()) {
case CREATEDATABASE:
handleEventOutputs(dgiBridge, event, Type.DATABASE);
break;
case CREATETABLE:
handleEventOutputs(dgiBridge, event, Type.TABLE);
List<Pair<? extends Entity, Referenceable>> tablesCreated = handleEventOutputs(dgiBridge, event, Type.TABLE);
handleExternalTables(dgiBridge, event, tablesCreated.get(0).getLeft(), tablesCreated.get(0).getRight());
break;
case CREATETABLE_AS_SELECT:
......@@ -221,25 +317,26 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
break;
case ALTERTABLE_FILEFORMAT:
case ALTERTABLE_LOCATION:
case ALTERTABLE_CLUSTER_SORT:
case ALTERTABLE_BUCKETNUM:
case ALTERTABLE_PROPERTIES:
case ALTERVIEW_PROPERTIES:
case ALTERTABLE_SERDEPROPERTIES:
case ALTERTABLE_SERIALIZER:
alterTable(dgiBridge, event);
break;
case ALTERTABLE_ADDCOLS:
case ALTERTABLE_REPLACECOLS:
case ALTERTABLE_RENAMECOL:
alterTable(dgiBridge, event);
handleEventOutputs(dgiBridge, event, Type.TABLE);
break;
case ALTERTABLE_LOCATION:
List<Pair<? extends Entity, Referenceable>> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE);
if (tablesUpdated != null && tablesUpdated.size() > 0) {
//Track altered lineage in case of external tables
handleExternalTables(dgiBridge, event, tablesUpdated.get(0).getLeft(), tablesUpdated.get(0).getRight());
}
case ALTERDATABASE:
case ALTERDATABASE_OWNER:
alterDatabase(dgiBridge, event);
handleEventOutputs(dgiBridge, event, Type.DATABASE);
break;
default:
......@@ -248,47 +345,23 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
notifyEntities(messages);
}
private void alterDatabase(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
assert event.outputs != null && event.outputs.size() > 0;
for (WriteEntity writeEntity : event.outputs) {
if (writeEntity.getType() == Type.DATABASE) {
//Create/update table entity
createOrUpdateEntities(dgiBridge, event.user, writeEntity);
}
}
}
private void alterTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
assert event.inputs != null && event.inputs.size() == 1;
assert event.outputs != null && event.outputs.size() > 0;
for (WriteEntity writeEntity : event.outputs) {
//Below check should filter out partition related ddls
if (writeEntity.getType() == Entity.Type.TABLE) {
//Create/update table entity
createOrUpdateEntities(dgiBridge, event.user, writeEntity);
}
}
}
private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
//crappy, no easy of getting new name
assert event.inputs != null && event.inputs.size() == 1;
assert event.outputs != null && event.outputs.size() > 0;
assert event.getInputs() != null && event.getInputs().size() == 1;
assert event.getOutputs() != null && event.getOutputs().size() > 0;
//Update entity if not exists
ReadEntity oldEntity = event.inputs.iterator().next();
ReadEntity oldEntity = event.getInputs().iterator().next();
Table oldTable = oldEntity.getTable();
for (WriteEntity writeEntity : event.outputs) {
for (WriteEntity writeEntity : event.getOutputs()) {
if (writeEntity.getType() == Entity.Type.TABLE) {
Table newTable = writeEntity.getTable();
if (newTable.getDbName().equals(oldTable.getDbName()) && !newTable.getTableName()
.equals(oldTable.getTableName())) {
//Create/update old table entity - create new entity and replace id
Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity);
String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
oldTable.getDbName(), oldTable.getTableName());
tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
......@@ -300,7 +373,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName);
newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase());
messages.add(new HookNotification.EntityPartialUpdateRequest(event.user,
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME,
oldQualifiedName, newEntity));
}
......@@ -346,12 +419,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return tableEntity;
}
private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception {
for (WriteEntity entity : event.outputs) {
private List<Pair<? extends Entity, Referenceable>> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception {
List<Pair<? extends Entity, Referenceable>> entitiesCreatedOrUpdated = new ArrayList<>();
for (Entity entity : event.getOutputs()) {
if (entity.getType() == entityType) {
createOrUpdateEntities(dgiBridge, event.user, entity);
Referenceable entityCreatedOrUpdated = createOrUpdateEntities(dgiBridge, event.getUser(), entity);
if (entitiesCreatedOrUpdated != null) {
entitiesCreatedOrUpdated.add(Pair.of(entity, entityCreatedOrUpdated));
}
}
}
return entitiesCreatedOrUpdated;
}
private String normalize(String str) {
......@@ -361,9 +439,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return str.toLowerCase().trim();
}
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
Set<ReadEntity> inputs = event.inputs;
Set<WriteEntity> outputs = event.outputs;
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
Set<ReadEntity> inputs = event.getInputs();
Set<WriteEntity> outputs = event.getOutputs();
//Even explain CTAS has operation name as CREATETABLE_AS_SELECT
if (inputs.isEmpty() && outputs.isEmpty()) {
......@@ -371,64 +449,54 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return;
}
if (event.queryId == null) {
LOG.info("Query id/plan is missing for {}" , event.queryStr);
if (event.getQueryId() == null) {
LOG.info("Query id/plan is missing for {}", event.getQueryStr());
}
String queryStr = normalize(event.queryStr);
LOG.debug("Registering query: {}", queryStr);
Map<String, Referenceable> source = new LinkedHashMap<>();
Map<String, Referenceable> target = new LinkedHashMap<>();
final Map<String, Referenceable> source = new LinkedHashMap<>();
final Map<String, Referenceable> target = new LinkedHashMap<>();
boolean isSelectQuery = isSelectQuery(event);
// Also filter out select queries which do not modify data
if (!isSelectQuery) {
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),readEntity.getTable().getDbName(), readEntity.getTable().getTableName());
if (!source.containsKey(tblQFName)) {
Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity);
source.put(tblQFName, inTable);
}
}
for (ReadEntity readEntity : event.getInputs()) {
processHiveEntity(dgiBridge, event, readEntity, source);
}
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), writeEntity.getTable().getDbName(), writeEntity.getTable().getTableName());
if (!target.containsKey(tblQFName)) {
target.put(tblQFName, outTable);
}
}
for (WriteEntity writeEntity : event.getOutputs()) {
processHiveEntity(dgiBridge, event, writeEntity, target);
}
if (source.size() > 0 || target.size() > 0) {
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
List<Referenceable> sourceList = new ArrayList<>(source.values());
List<Referenceable> targetList = new ArrayList<>(target.values());
//The serialization code expected a list
processReferenceable.set("inputs", sourceList);
processReferenceable.set("outputs", targetList);
processReferenceable.set("name", queryStr);
processReferenceable.set("operationType", event.operation.getOperationName());
processReferenceable.set("startTime", event.queryStartTime);
processReferenceable.set("userName", event.user);
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", event.queryId);
processReferenceable.set("queryPlan", event.jsonPlan.toString());
processReferenceable.set("endTime", System.currentTimeMillis());
//TODO set queryGraph
messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable));
Referenceable processReferenceable = getProcessReferenceable(event,
new ArrayList<Referenceable>() {{
addAll(source.values());
}},
new ArrayList<Referenceable>() {{
addAll(target.values());
}});
messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
} else {
LOG.info("Skipped query {} since it has no inputs or resulting outputs", queryStr);
LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr());
}
} else {
LOG.info("Skipped query {} for processing since it is a select query ", queryStr);
LOG.info("Skipped query {} for processing since it is a select query ", event.getQueryStr());
}
}
private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets) throws Exception {
if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) {
final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable().getDbName(), entity.getTable().getTableName());
if (!dataSets.containsKey(tblQFName)) {
Referenceable inTable = createOrUpdateEntities(dgiBridge, event.getUser(), entity);
dataSets.put(tblQFName, inTable);
}
} else if (entity.getType() == Type.DFS_DIR) {
final String pathUri = normalize(new Path(entity.getLocation()).toString());
LOG.info("Registering DFS Path {} ", pathUri);
Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
dataSets.put(pathUri, hdfsPath);
}
}
......@@ -444,13 +512,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
}
private boolean isSelectQuery(HiveEvent event) {
if (event.operation == HiveOperation.QUERY) {
Set<WriteEntity> outputs = event.outputs;
private boolean isSelectQuery(HiveEventContext event) {
if (event.getOperation() == HiveOperation.QUERY) {
Set<WriteEntity> outputs = event.getOutputs();
//Select query has only one output
if (outputs.size() == 1) {
WriteEntity output = outputs.iterator().next();
if (event.getOutputs().size() == 1) {
WriteEntity output = event.getOutputs().iterator().next();
/* Strangely select queries have DFS_DIR as the type which seems like a bug in hive. Filter out by checking if the path is a temporary URI
* Insert into/overwrite queries onto local or dfs paths have DFS_DIR or LOCAL_DIR as the type and WriteType.PATH_WRITE and tempUri = false
* Insert into a temporary table has isTempURI = false. So will not skip as expected
......@@ -465,4 +533,50 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
return false;
}
private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final Entity entity, final Referenceable tblRef) throws HiveException, MalformedURLException {
Table hiveTable = entity.getTable();
//Refresh to get the correct location
hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
final String location = normalize(hiveTable.getDataLocation().toString());
if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) {
LOG.info("Registering external table process {} ", event.getQueryStr());
List<Referenceable> inputs = new ArrayList<Referenceable>() {{
add(dgiBridge.fillHDFSDataSet(location));
}};
List<Referenceable> outputs = new ArrayList<Referenceable>() {{
add(tblRef);
}};
Referenceable processReferenceable = getProcessReferenceable(event, inputs, outputs);
messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
}
}
private Referenceable getProcessReferenceable(HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) {
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
String queryStr = normalize(hiveEvent.getQueryStr());
LOG.debug("Registering query: {}", queryStr);
//The serialization code expected a list
if (sourceList != null || !sourceList.isEmpty()) {
processReferenceable.set("inputs", sourceList);
}
if (targetList != null || !targetList.isEmpty()) {
processReferenceable.set("outputs", targetList);
}
processReferenceable.set("name", queryStr);
processReferenceable.set("operationType", hiveEvent.getOperation().getOperationName());
processReferenceable.set("startTime", hiveEvent.getQueryStartTime());
processReferenceable.set("userName", hiveEvent.getUser());
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", hiveEvent.getQueryId());
processReferenceable.set("queryPlan", hiveEvent.getJsonPlan());
processReferenceable.set("endTime", System.currentTimeMillis());
//TODO set queryGraph
return processReferenceable;
}
}
......@@ -22,16 +22,19 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.ql.Driver;
......@@ -62,6 +65,9 @@ public class HiveHookIT {
private AtlasClient dgiCLient;
private SessionState ss;
private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
private enum QUERY_TYPE {
GREMLIN,
DSL
......@@ -81,9 +87,11 @@ public class HiveHookIT {
SessionState.setCurrentSessionState(ss);
Configuration configuration = ApplicationProperties.get();
dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, configuration);
hiveMetaStoreBridge.registerHiveDataModel();
dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
}
private void runCommand(String cmd) throws Exception {
......@@ -145,10 +153,15 @@ public class HiveHookIT {
return tableName;
}
private String createTable(boolean isPartitioned, boolean isTemporary) throws Exception {
private String createTable(boolean isExternal, boolean isPartitioned, boolean isTemporary) throws Exception {
String tableName = tableName();
runCommand("create " + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ?
" partitioned by(dt string)" : ""));
String location = "";
if (isExternal) {
location = " location '" + createTestDFSPath("someTestPath") + "'";
}
runCommand("create " + (isExternal ? " EXTERNAL " : "") + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ?
" partitioned by(dt string)" : "") + location);
return tableName;
}
......@@ -182,6 +195,37 @@ public class HiveHookIT {
assertDatabaseIsRegistered(DEFAULT_DB);
}
@Test
public void testCreateExternalTable() throws Exception {
String tableName = tableName();
String dbName = createDatabase();
String colName = columnName();
String pFile = createTestDFSPath("parentPath");
final String query = String.format("create EXTERNAL table %s.%s( %s, %s) location '%s'", dbName , tableName , colName + " int", "name string", pFile);
runCommand(query);
String tableId = assertTableIsRegistered(dbName, tableName);
Referenceable processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, pFile, INPUTS);
validateOutputTables(processReference, tableId);
}
private void validateOutputTables(Referenceable processReference, String... expectedTableGuids) throws Exception {
validateTables(processReference, OUTPUTS, expectedTableGuids);
}
private void validateInputTables(Referenceable processReference, String... expectedTableGuids) throws Exception {
validateTables(processReference, INPUTS, expectedTableGuids);
}
private void validateTables(Referenceable processReference, String attrName, String... expectedTableGuids) throws Exception {
List<Id> tableRef = (List<Id>) processReference.get(attrName);
for(int i = 0; i < expectedTableGuids.length; i++) {
Assert.assertEquals(tableRef.get(i)._getId(), expectedTableGuids[i]);
}
}
private String assertColumnIsRegistered(String colName) throws Exception {
LOG.debug("Searching for column {}", colName.toLowerCase());
String query =
......@@ -265,9 +309,16 @@ public class HiveHookIT {
Assert.assertEquals(vertices.length(), 0);
}
private String createTestDFSPath(String path) throws Exception {
return "pfile://" + mkdir(path);
}
private String createTestDFSFile(String path) throws Exception {
return "pfile://" + file(path);
}
@Test
public void testLoadData() throws Exception {
public void testLoadLocalPath() throws Exception {
String tableName = createTable(false);
String loadFile = file("load");
......@@ -278,17 +329,69 @@ public class HiveHookIT {
}
@Test
public void testLoadDataIntoPartition() throws Exception {
public void testLoadLocalPathIntoPartition() throws Exception {
String tableName = createTable(true);
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
runCommand(query);
validateProcess(query, 0, 1);
}
@Test
public void testLoadDFSPath() throws Exception {
String tableName = createTable(true, true, false);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String loadFile = createTestDFSFile("loadDFSFile");
String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
runCommand(query);
Referenceable processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, loadFile, INPUTS);
validateOutputTables(processReference, tableId);
}
private Referenceable validateProcess(String query, int numInputs, int numOutputs) throws Exception {
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
if (numInputs == 0) {
Assert.assertNull(process.get(INPUTS));
} else {
Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), numInputs);
}
if (numOutputs == 0) {
Assert.assertNull(process.get(OUTPUTS));
} else {
Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), numOutputs);
}
return process;
}
private Referenceable validateProcess(String query, String[] inputs, String[] outputs) throws Exception {
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertNull(process.get("inputs"));
Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
if (inputs == null) {
Assert.assertNull(process.get(INPUTS));
} else {
Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputs.length);
validateInputTables(process, inputs);
}
if (outputs == null) {
Assert.assertNull(process.get(OUTPUTS));
} else {
Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputs.length);
validateOutputTables(process, outputs);
}
return process;
}
@Test
......@@ -299,13 +402,11 @@ public class HiveHookIT {
"insert into " + insertTableName + " select id, name from " + tableName;
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
validateProcess(query, new String[] {inputTableId}, new String[] {opTableId});
}
@Test
......@@ -316,10 +417,7 @@ public class HiveHookIT {
"insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertNull(process.get("outputs"));
validateProcess(query, 1, 0);
assertTableIsRegistered(DEFAULT_DB, tableName);
}
......@@ -327,34 +425,32 @@ public class HiveHookIT {
@Test
public void testInsertIntoDFSDir() throws Exception {
String tableName = createTable();
String pFile = "pfile://" + mkdir("somedfspath");
String pFile = createTestDFSPath("somedfspath");
String query =
"insert overwrite DIRECTORY '" + pFile + "' select id, name from " + tableName;
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertNull(process.get("outputs"));
Referenceable processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, pFile, OUTPUTS);
assertTableIsRegistered(DEFAULT_DB, tableName);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateInputTables(processReference, tableId);
}
@Test
public void testInsertIntoTempTable() throws Exception {
String tableName = createTable();
String insertTableName = createTable(false, true);
String insertTableName = createTable(false, false, true);
String query =
"insert into " + insertTableName + " select id, name from " + tableName;
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
validateProcess(query, 1, 1);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
validateProcess(query, new String[] {ipTableId}, new String[] {opTableId});
}
@Test
......@@ -365,10 +461,11 @@ public class HiveHookIT {
"insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
+ " where dt = '2015-01-01'";
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
validateProcess(query, 1, 1);
String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
validateProcess(query, new String[] {ipTableId}, new String[] {opTableId});
}
private String random() {
......@@ -390,19 +487,62 @@ public class HiveHookIT {
}
@Test
public void testExportImport() throws Exception {
public void testExportImportUnPartitionedTable() throws Exception {
String tableName = createTable(false);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
assertProcessIsRegistered(query);
Referenceable processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, filename, OUTPUTS);
validateInputTables(processReference, tableId);
//Import
tableName = createTable(false);
tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
query = "import table " + tableName + " from '" + filename + "'";
runCommand(query);
assertProcessIsRegistered(query);
processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, filename, INPUTS);
validateOutputTables(processReference, tableId);
}
@Test
public void testExportImportPartitionedTable() throws Exception {
String tableName = createTable(true);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
//Add a partition
String partFile = "pfile://" + mkdir("partition");
String query = "alter table " + tableName + " add partition (dt='2015-01-01') location '" + partFile + "'";
runCommand(query);
String filename = "pfile://" + mkdir("export");
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
Referenceable processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, filename, OUTPUTS);
validateInputTables(processReference, tableId);
//Import
tableName = createTable(true);
tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
query = "import table " + tableName + " from '" + filename + "'";
runCommand(query);
processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, filename, INPUTS);
validateOutputTables(processReference, tableId);
}
@Test
......@@ -561,8 +701,9 @@ public class HiveHookIT {
@Test
public void testAlterTableLocation() throws Exception {
String tableName = createTable();
final String testPath = "file://" + System.getProperty("java.io.tmpdir", "/tmp") + File.pathSeparator + "testPath";
//Its an external table, so the HDFS location should also be registered as an entity
String tableName = createTable(true, true, false);
final String testPath = createTestDFSPath("testBaseDir");
String query = "alter table " + tableName + " set location '" + testPath + "'";
runCommand(query);
......@@ -571,6 +712,38 @@ public class HiveHookIT {
Referenceable tableRef = dgiCLient.getEntity(tableId);
Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
Assert.assertEquals(sdRef.get("location"), testPath);
Referenceable processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, testPath, INPUTS);
validateOutputTables(processReference, tableId);
}
private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception {
List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
final String testPathNormed = normalize(new Path(testPath).toString());
String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
Referenceable hdfsPathRef = dgiCLient.getEntity(hdfsPathId);
Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
Assert.assertEquals(hdfsPathRef.get("name"), testPathNormed);
// Assert.assertEquals(hdfsPathRef.get("name"), new Path(testPath).getName());
Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed);
return hdfsPathRef.getId()._getId();
}
private String assertHDFSPathIsRegistered(String path) throws Exception {
final String typeName = FSDataTypes.HDFS_PATH().toString();
final String parentTypeName = FSDataTypes.FS_PATH().toString();
String gremlinQuery =
String.format("g.V.has('__typeName', '%s').has('%s.path', \"%s\").toList()", typeName, parentTypeName,
normalize(path));
return assertEntityIsRegistered(gremlinQuery);
}
@Test
......
......@@ -219,8 +219,10 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr);
dataSetReferenceable.set("path", hdfsPathStr);
dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal"));
final Path hdfsPath = new Path(hdfsPathStr);
dataSetReferenceable.set(AtlasClient.NAME, hdfsPath.getName());
//Fix after ATLAS-542
// final Path hdfsPath = new Path(hdfsPathStr);
// dataSetReferenceable.set(AtlasClient.NAME, hdfsPath.getName());
dataSetReferenceable.set(AtlasClient.NAME, hdfsPathStr);
break;
case "HiveBolt":
......
......@@ -91,6 +91,9 @@ public class AtlasClient {
public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable";
public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
public static final String PROCESS_ATTRIBUTE_INPUTS = "inputs";
public static final String PROCESS_ATTRIBUTE_OUTPUTS = "outputs";
public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
public static final String UNKNOWN_STATUS = "Unknown status";
......
......@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-527 Support lineage for load table, import, export (sumasai via shwethags)
ATLAS-572 Handle secure instance of Zookeeper for leader election.(yhemanth via sumasai)
ATLAS-605 Hook Notifications for DELETE entity needs to be supported (sumasai)
ATLAS-607 Add Support for delete entity through a qualifiedName (sumasai via yhemanth)
......
......@@ -31,6 +31,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Comparator;
public class ReservedTypesRegistrar implements IBootstrapTypesRegistrar {
......@@ -48,7 +50,17 @@ public class ReservedTypesRegistrar implements IBootstrapTypesRegistrar {
LOG.info("No types directory {} found - not registering any reserved types", typesDirName);
return;
}
File[] typeDefFiles = typesDir.listFiles();
//TODO - Enforce a dependency order among models registered by definition and not by modifiedTime as below
// Workaround - Sort by modifiedTime to get the dependency of models in the right order - first hdfs, followed by hive and hive is needed by storm, falcon models.
// Sorting them by time will ensure the right order since the modules are in the correct order in pom.
Arrays.sort(typeDefFiles, new Comparator<File>() {
public int compare(File f1, File f2) {
return Long.valueOf(f1.lastModified()).compareTo(f2.lastModified());
}
});
for (File typeDefFile : typeDefFiles) {
try {
String typeDefJSON = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment