Commit f379c9ff by ashutoshm Committed by Madhan Neethiraj

ATLAS-1666: updated exception handling to avoid use of generic exceptions

parent 1612b305
...@@ -73,8 +73,7 @@ public class FalconBridge { ...@@ -73,8 +73,7 @@ public class FalconBridge {
* @param cluster ClusterEntity * @param cluster ClusterEntity
* @return cluster instance reference * @return cluster instance reference
*/ */
public static Referenceable createClusterEntity(final org.apache.falcon.entity.v0.cluster.Cluster cluster) public static Referenceable createClusterEntity(final org.apache.falcon.entity.v0.cluster.Cluster cluster) {
throws Exception {
LOG.info("Creating cluster Entity : {}", cluster.getName()); LOG.info("Creating cluster Entity : {}", cluster.getName());
Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName()); Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName());
...@@ -97,7 +96,7 @@ public class FalconBridge { ...@@ -97,7 +96,7 @@ public class FalconBridge {
return clusterRef; return clusterRef;
} }
private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable) throws Exception { private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable) {
LOG.info("Creating feed dataset: {}", feed.getName()); LOG.info("Creating feed dataset: {}", feed.getName());
Referenceable feedEntity = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); Referenceable feedEntity = new Referenceable(FalconDataTypes.FALCON_FEED.getName());
...@@ -338,8 +337,7 @@ public class FalconBridge { ...@@ -338,8 +337,7 @@ public class FalconBridge {
return entities; return entities;
} }
private static Referenceable createHiveDatabaseInstance(String clusterName, String dbName) private static Referenceable createHiveDatabaseInstance(String clusterName, String dbName) {
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(AtlasClient.NAME, dbName); dbRef.set(AtlasClient.NAME, dbName);
...@@ -349,7 +347,7 @@ public class FalconBridge { ...@@ -349,7 +347,7 @@ public class FalconBridge {
} }
private static List<Referenceable> createHiveTableInstance(String clusterName, String dbName, private static List<Referenceable> createHiveTableInstance(String clusterName, String dbName,
String tableName) throws Exception { String tableName) {
List<Referenceable> entities = new ArrayList<>(); List<Referenceable> entities = new ArrayList<>();
Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName);
entities.add(dbRef); entities.add(dbRef);
......
...@@ -18,16 +18,15 @@ ...@@ -18,16 +18,15 @@
package org.apache.atlas.hive.bridge; package org.apache.atlas.hive.bridge;
import java.util.ArrayList; import com.google.common.annotations.VisibleForTesting;
import java.util.Date; import com.sun.jersey.api.client.ClientResponse;
import java.util.List;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.hook.HiveHook; import org.apache.atlas.hive.hook.HiveHook;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
...@@ -55,8 +54,9 @@ import org.apache.hadoop.security.UserGroupInformation; ...@@ -55,8 +54,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList;
import com.sun.jersey.api.client.ClientResponse; import java.util.Date;
import java.util.List;
/** /**
* A Bridge Utility that imports metadata from the Hive Meta Store * A Bridge Utility that imports metadata from the Hive Meta Store
...@@ -420,7 +420,7 @@ public class HiveMetaStoreBridge { ...@@ -420,7 +420,7 @@ public class HiveMetaStoreBridge {
* @throws Exception * @throws Exception
*/ */
public Referenceable createTableInstance(Referenceable dbReference, Table hiveTable) public Referenceable createTableInstance(Referenceable dbReference, Table hiveTable)
throws Exception { throws AtlasHookException {
return createOrUpdateTableInstance(dbReference, null, hiveTable); return createOrUpdateTableInstance(dbReference, null, hiveTable);
} }
...@@ -429,7 +429,7 @@ public class HiveMetaStoreBridge { ...@@ -429,7 +429,7 @@ public class HiveMetaStoreBridge {
} }
private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference, private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference,
final Table hiveTable) throws Exception { final Table hiveTable) throws AtlasHookException {
LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName()); LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
if (tableReference == null) { if (tableReference == null) {
...@@ -494,22 +494,26 @@ public class HiveMetaStoreBridge { ...@@ -494,22 +494,26 @@ public class HiveMetaStoreBridge {
return entityQualifiedName + "_storage"; return entityQualifiedName + "_storage";
} }
private Referenceable registerTable(Referenceable dbReference, Table table) throws Exception { private Referenceable registerTable(Referenceable dbReference, Table table) throws AtlasHookException {
String dbName = table.getDbName(); try {
String tableName = table.getTableName(); String dbName = table.getDbName();
LOG.info("Attempting to register table [{}]", tableName); String tableName = table.getTableName();
Referenceable tableReference = getTableReference(table); LOG.info("Attempting to register table [{}]", tableName);
LOG.info("Found result {}", tableReference); Referenceable tableReference = getTableReference(table);
if (tableReference == null) { LOG.info("Found result {}", tableReference);
tableReference = createTableInstance(dbReference, table); if (tableReference == null) {
tableReference = registerInstance(tableReference); tableReference = createTableInstance(dbReference, table);
} else { tableReference = registerInstance(tableReference);
LOG.info("Table {}.{} is already registered with id {}. Updating entity.", dbName, tableName, } else {
tableReference.getId().id); LOG.info("Table {}.{} is already registered with id {}. Updating entity.", dbName, tableName,
tableReference = createOrUpdateTableInstance(dbReference, tableReference, table); tableReference.getId().id);
updateInstance(tableReference); tableReference = createOrUpdateTableInstance(dbReference, tableReference, table);
updateInstance(tableReference);
}
return tableReference;
} catch (Exception e) {
throw new AtlasHookException("HiveMetaStoreBridge.getStorageDescQFName() failed.", e);
} }
return tableReference;
} }
private void updateInstance(Referenceable referenceable) throws AtlasServiceException { private void updateInstance(Referenceable referenceable) throws AtlasServiceException {
...@@ -523,7 +527,7 @@ public class HiveMetaStoreBridge { ...@@ -523,7 +527,7 @@ public class HiveMetaStoreBridge {
} }
public Referenceable fillStorageDesc(StorageDescriptor storageDesc, String tableQualifiedName, public Referenceable fillStorageDesc(StorageDescriptor storageDesc, String tableQualifiedName,
String sdQualifiedName, Id tableId) throws Exception { String sdQualifiedName, Id tableId) throws AtlasHookException {
LOG.debug("Filling storage descriptor information for {}", storageDesc); LOG.debug("Filling storage descriptor information for {}", storageDesc);
Referenceable sdReferenceable = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName()); Referenceable sdReferenceable = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
...@@ -590,7 +594,7 @@ public class HiveMetaStoreBridge { ...@@ -590,7 +594,7 @@ public class HiveMetaStoreBridge {
return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName); return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName);
} }
public List<Referenceable> getColumns(List<FieldSchema> schemaList, Referenceable tableReference) throws Exception { public List<Referenceable> getColumns(List<FieldSchema> schemaList, Referenceable tableReference) throws AtlasHookException {
List<Referenceable> colList = new ArrayList<>(); List<Referenceable> colList = new ArrayList<>();
int columnPosition = 0; int columnPosition = 0;
for (FieldSchema fs : schemaList) { for (FieldSchema fs : schemaList) {
...@@ -612,8 +616,8 @@ public class HiveMetaStoreBridge { ...@@ -612,8 +616,8 @@ public class HiveMetaStoreBridge {
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws AtlasHookException {
try {
Configuration atlasConf = ApplicationProperties.get(); Configuration atlasConf = ApplicationProperties.get();
String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT); String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
if (atlasEndpoint == null || atlasEndpoint.length == 0){ if (atlasEndpoint == null || atlasEndpoint.length == 0){
...@@ -640,5 +644,9 @@ public class HiveMetaStoreBridge { ...@@ -640,5 +644,9 @@ public class HiveMetaStoreBridge {
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClient); HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClient);
hiveMetaStoreBridge.importHiveMetadata(failOnError); hiveMetaStoreBridge.importHiveMetadata(failOnError);
}
catch(Exception e) {
throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e);
}
} }
} }
...@@ -23,10 +23,11 @@ import com.google.common.annotations.VisibleForTesting; ...@@ -23,10 +23,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.bridge.ColumnLineageUtils; import org.apache.atlas.hive.bridge.ColumnLineageUtils;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
...@@ -36,8 +37,13 @@ import org.apache.hadoop.hive.conf.HiveConf; ...@@ -36,8 +37,13 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.hooks.*; import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.Entity.Type; import org.apache.hadoop.hive.ql.hooks.Entity.Type;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Table;
...@@ -49,7 +55,6 @@ import org.json.JSONObject; ...@@ -49,7 +55,6 @@ import org.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
...@@ -67,7 +72,6 @@ import java.util.SortedMap; ...@@ -67,7 +72,6 @@ import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -378,77 +382,87 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -378,77 +382,87 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return Pair.of(changedColStringOldName, changedColStringNewName); return Pair.of(changedColStringOldName, changedColStringNewName);
} }
private void renameColumn(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception { private void renameColumn(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws AtlasHookException {
assert event.getInputs() != null && event.getInputs().size() == 1; try {
assert event.getOutputs() != null && event.getOutputs().size() > 0; assert event.getInputs() != null && event.getInputs().size() == 1;
assert event.getOutputs() != null && event.getOutputs().size() > 0;
Table oldTable = event.getInputs().iterator().next().getTable();
List<FieldSchema> oldColList = oldTable.getAllCols(); Table oldTable = event.getInputs().iterator().next().getTable();
Table outputTbl = event.getOutputs().iterator().next().getTable(); List<FieldSchema> oldColList = oldTable.getAllCols();
outputTbl = dgiBridge.hiveClient.getTable(outputTbl.getDbName(), outputTbl.getTableName()); Table outputTbl = event.getOutputs().iterator().next().getTable();
List<FieldSchema> newColList = outputTbl.getAllCols(); outputTbl = dgiBridge.hiveClient.getTable(outputTbl.getDbName(), outputTbl.getTableName());
assert oldColList.size() == newColList.size(); List<FieldSchema> newColList = outputTbl.getAllCols();
assert oldColList.size() == newColList.size();
Pair<String, String> changedColNamePair = findChangedColNames(oldColList, newColList);
String oldColName = changedColNamePair.getLeft(); Pair<String, String> changedColNamePair = findChangedColNames(oldColList, newColList);
String newColName = changedColNamePair.getRight(); String oldColName = changedColNamePair.getLeft();
for (WriteEntity writeEntity : event.getOutputs()) { String newColName = changedColNamePair.getRight();
if (writeEntity.getType() == Type.TABLE) { for (WriteEntity writeEntity : event.getOutputs()) {
Table newTable = writeEntity.getTable(); if (writeEntity.getType() == Type.TABLE) {
createOrUpdateEntities(dgiBridge, event, writeEntity, true, oldTable); Table newTable = writeEntity.getTable();
final String newQualifiedTableName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), createOrUpdateEntities(dgiBridge, event, writeEntity, true, oldTable);
newTable); final String newQualifiedTableName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(),
String oldColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newQualifiedTableName, oldColName); newTable);
String newColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newQualifiedTableName, newColName); String oldColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newQualifiedTableName, oldColName);
Referenceable newColEntity = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName()); String newColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newQualifiedTableName, newColName);
newColEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newColumnQFName); Referenceable newColEntity = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
newColEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newColumnQFName);
event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
oldColumnQFName, newColEntity)); HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
oldColumnQFName, newColEntity));
}
} }
handleEventOutputs(dgiBridge, event, Type.TABLE);
}
catch(Exception e) {
throw new AtlasHookException("HiveHook.renameColumn() failed.", e);
} }
handleEventOutputs(dgiBridge, event, Type.TABLE);
} }
private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception { private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws AtlasHookException {
//crappy, no easy of getting new name try {
assert event.getInputs() != null && event.getInputs().size() == 1; //crappy, no easy of getting new name
assert event.getOutputs() != null && event.getOutputs().size() > 0; assert event.getInputs() != null && event.getInputs().size() == 1;
assert event.getOutputs() != null && event.getOutputs().size() > 0;
//Update entity if not exists
ReadEntity oldEntity = event.getInputs().iterator().next(); //Update entity if not exists
Table oldTable = oldEntity.getTable(); ReadEntity oldEntity = event.getInputs().iterator().next();
Table oldTable = oldEntity.getTable();
for (WriteEntity writeEntity : event.getOutputs()) {
if (writeEntity.getType() == Entity.Type.TABLE) { for (WriteEntity writeEntity : event.getOutputs()) {
Table newTable = writeEntity.getTable(); if (writeEntity.getType() == Entity.Type.TABLE) {
//Hive sends with both old and new table names in the outputs which is weird. So skipping that with the below check Table newTable = writeEntity.getTable();
if (!newTable.getDbName().equals(oldTable.getDbName()) || !newTable.getTableName().equals(oldTable.getTableName())) { //Hive sends with both old and new table names in the outputs which is weird. So skipping that with the below check
final String oldQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), if (!newTable.getDbName().equals(oldTable.getDbName()) || !newTable.getTableName().equals(oldTable.getTableName())) {
oldTable); final String oldQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(),
final String newQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), oldTable);
newTable); final String newQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(),
newTable);
//Create/update old table entity - create entity with oldQFNme and old tableName if it doesnt exist. If exists, will update
//We always use the new entity while creating the table since some flags, attributes of the table are not set in inputEntity and Hive.getTable(oldTableName) also fails since the table doesnt exist in hive anymore //Create/update old table entity - create entity with oldQFNme and old tableName if it doesnt exist. If exists, will update
final LinkedHashMap<Type, Referenceable> tables = createOrUpdateEntities(dgiBridge, event, writeEntity, true); //We always use the new entity while creating the table since some flags, attributes of the table are not set in inputEntity and Hive.getTable(oldTableName) also fails since the table doesnt exist in hive anymore
Referenceable tableEntity = tables.get(Type.TABLE); final LinkedHashMap<Type, Referenceable> tables = createOrUpdateEntities(dgiBridge, event, writeEntity, true);
Referenceable tableEntity = tables.get(Type.TABLE);
//Reset regular column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveMetaStoreBridge.COLUMNS), oldQualifiedName, newQualifiedName); //Reset regular column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveMetaStoreBridge.COLUMNS), oldQualifiedName, newQualifiedName);
//Reset partition key column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveMetaStoreBridge.PART_COLS), oldQualifiedName, newQualifiedName); //Reset partition key column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveMetaStoreBridge.PART_COLS), oldQualifiedName, newQualifiedName);
//Reset SD QF Name to old Name and create a new partial notification request to replace old SD QFName to newName to retain any existing traits
replaceSDQFName(event, tableEntity, oldQualifiedName, newQualifiedName); //Reset SD QF Name to old Name and create a new partial notification request to replace old SD QFName to newName to retain any existing traits
replaceSDQFName(event, tableEntity, oldQualifiedName, newQualifiedName);
//Reset Table QF Name to old Name and create a new partial notification request to replace old Table QFName to newName
replaceTableQFName(event, oldTable, newTable, tableEntity, oldQualifiedName, newQualifiedName); //Reset Table QF Name to old Name and create a new partial notification request to replace old Table QFName to newName
replaceTableQFName(event, oldTable, newTable, tableEntity, oldQualifiedName, newQualifiedName);
}
} }
} }
} }
catch(Exception e) {
throw new AtlasHookException("HiveHook.renameTable() failed.", e);
}
} }
private Referenceable replaceTableQFName(HiveEventContext event, Table oldTable, Table newTable, final Referenceable tableEntity, final String oldTableQFName, final String newTableQFName) throws HiveException { private Referenceable replaceTableQFName(HiveEventContext event, Table oldTable, Table newTable, final Referenceable tableEntity, final String oldTableQFName, final String newTableQFName) throws HiveException {
...@@ -494,7 +508,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -494,7 +508,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
final Referenceable sdRef = ((Referenceable) tableEntity.get(HiveMetaStoreBridge.STORAGE_DESC)); final Referenceable sdRef = ((Referenceable) tableEntity.get(HiveMetaStoreBridge.STORAGE_DESC));
sdRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getStorageDescQFName(oldTblQFName)); sdRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getStorageDescQFName(oldTblQFName));
//Replace SD QF name first to retain tags //Replace SD QF name fir st to retain tags
final String oldSDQFName = HiveMetaStoreBridge.getStorageDescQFName(oldTblQFName); final String oldSDQFName = HiveMetaStoreBridge.getStorageDescQFName(oldTblQFName);
final String newSDQFName = HiveMetaStoreBridge.getStorageDescQFName(newTblQFName); final String newSDQFName = HiveMetaStoreBridge.getStorageDescQFName(newTblQFName);
...@@ -507,81 +521,95 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -507,81 +521,95 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return newSDEntity; return newSDEntity;
} }
private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, boolean skipTempTables, Table existTable) throws Exception { private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, boolean skipTempTables, Table existTable) throws AtlasHookException {
Database db = null; try {
Table table = null; Database db = null;
Partition partition = null; Table table = null;
LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>(); Partition partition = null;
List<Referenceable> entities = new ArrayList<>(); LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>();
List<Referenceable> entities = new ArrayList<>();
switch (entity.getType()) {
case DATABASE: switch (entity.getType()) {
db = entity.getDatabase(); case DATABASE:
break; db = entity.getDatabase();
break;
case TABLE:
table = entity.getTable(); case TABLE:
db = dgiBridge.hiveClient.getDatabase(table.getDbName()); table = entity.getTable();
break; db = dgiBridge.hiveClient.getDatabase(table.getDbName());
break;
case PARTITION:
partition = entity.getPartition(); case PARTITION:
table = partition.getTable(); partition = entity.getPartition();
db = dgiBridge.hiveClient.getDatabase(table.getDbName()); table = partition.getTable();
break; db = dgiBridge.hiveClient.getDatabase(table.getDbName());
break;
default:
LOG.info("{}: entity-type not handled by Atlas hook. Ignored", entity.getType()); default:
} LOG.info("{}: entity-type not handled by Atlas hook. Ignored", entity.getType());
}
if (db != null) { if (db != null) {
db = dgiBridge.hiveClient.getDatabase(db.getName()); db = dgiBridge.hiveClient.getDatabase(db.getName());
} }
if (db != null) { if (db != null) {
Referenceable dbEntity = dgiBridge.createDBInstance(db); Referenceable dbEntity = dgiBridge.createDBInstance(db);
entities.add(dbEntity); entities.add(dbEntity);
result.put(Type.DATABASE, dbEntity); result.put(Type.DATABASE, dbEntity);
Referenceable tableEntity = null; Referenceable tableEntity = null;
if (table != null) { if (table != null) {
if (existTable != null) { if (existTable != null) {
table = existTable; table = existTable;
} else { } else {
table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName()); table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName());
}
//If its an external table, even though the temp table skip flag is on,
// we create the table since we need the HDFS path to temp table lineage.
if (skipTempTables &&
table.isTemporary() &&
!TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
LOG.debug("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
} else {
tableEntity = dgiBridge.createTableInstance(dbEntity, table);
entities.add(tableEntity);
result.put(Type.TABLE, tableEntity);
}
} }
//If its an external table, even though the temp table skip flag is on,
// we create the table since we need the HDFS path to temp table lineage.
if (skipTempTables &&
table.isTemporary() &&
!TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
LOG.debug("Skipping temporary table registration {} since it is not an external table {} ", table.getTableName(), table.getTableType().name());
} else { event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
tableEntity = dgiBridge.createTableInstance(dbEntity, table);
entities.add(tableEntity);
result.put(Type.TABLE, tableEntity);
}
} }
return result;
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities)); }
catch(Exception e) {
throw new AtlasHookException("HiveHook.createOrUpdateEntities() failed.", e);
} }
return result;
} }
private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, boolean skipTempTables) throws Exception{ private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, boolean skipTempTables) throws AtlasHookException {
return createOrUpdateEntities(dgiBridge, event, entity, skipTempTables, null); try {
return createOrUpdateEntities(dgiBridge, event, entity, skipTempTables, null);
} catch (Exception e) {
throw new AtlasHookException("HiveHook.createOrUpdateEntities() failed.", e);
}
} }
private LinkedHashMap<Type, Referenceable> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception { private LinkedHashMap<Type, Referenceable> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws AtlasHookException {
for (Entity entity : event.getOutputs()) { try {
if (entity.getType() == entityType) { for (Entity entity : event.getOutputs()) {
return createOrUpdateEntities(dgiBridge, event, entity, true); if (entity.getType() == entityType) {
return createOrUpdateEntities(dgiBridge, event, entity, true);
}
} }
return null;
}
catch(Exception e) {
throw new AtlasHookException("HiveHook.handleEventOutputs() failed.", e);
} }
return null;
} }
private static Entity getEntityByType(Set<? extends Entity> entities, Type entityType) { private static Entity getEntityByType(Set<? extends Entity> entities, Type entityType) {
...@@ -600,98 +628,108 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -600,98 +628,108 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return str.toLowerCase().trim(); return str.toLowerCase().trim();
} }
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception { private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws AtlasHookException {
Set<ReadEntity> inputs = event.getInputs(); try {
Set<WriteEntity> outputs = event.getOutputs(); Set<ReadEntity> inputs = event.getInputs();
Set<WriteEntity> outputs = event.getOutputs();
//Even explain CTAS has operation name as CREATETABLE_AS_SELECT //Even explain CTAS has operation name as CREATETABLE_AS_SELECT
if (inputs.isEmpty() && outputs.isEmpty()) { if (inputs.isEmpty() && outputs.isEmpty()) {
LOG.info("Explain statement. Skipping..."); LOG.info("Explain statement. Skipping...");
return; return;
} }
if (event.getQueryId() == null) { if (event.getQueryId() == null) {
LOG.info("Query id/plan is missing for {}", event.getQueryStr()); LOG.info("Query id/plan is missing for {}", event.getQueryStr());
} }
final SortedMap<ReadEntity, Referenceable> source = new TreeMap<>(entityComparator); final SortedMap<ReadEntity, Referenceable> source = new TreeMap<>(entityComparator);
final SortedMap<WriteEntity, Referenceable> target = new TreeMap<>(entityComparator); final SortedMap<WriteEntity, Referenceable> target = new TreeMap<>(entityComparator);
final Set<String> dataSets = new HashSet<>(); final Set<String> dataSets = new HashSet<>();
final Set<Referenceable> entities = new LinkedHashSet<>(); final Set<Referenceable> entities = new LinkedHashSet<>();
boolean isSelectQuery = isSelectQuery(event); boolean isSelectQuery = isSelectQuery(event);
// filter out select queries which do not modify data // filter out select queries which do not modify data
if (!isSelectQuery) { if (!isSelectQuery) {
SortedSet<ReadEntity> sortedHiveInputs = new TreeSet<>(entityComparator); SortedSet<ReadEntity> sortedHiveInputs = new TreeSet<>(entityComparator);
if ( event.getInputs() != null) { if (event.getInputs() != null) {
sortedHiveInputs.addAll(event.getInputs()); sortedHiveInputs.addAll(event.getInputs());
} }
SortedSet<WriteEntity> sortedHiveOutputs = new TreeSet<>(entityComparator); SortedSet<WriteEntity> sortedHiveOutputs = new TreeSet<>(entityComparator);
if ( event.getOutputs() != null) { if (event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs()); sortedHiveOutputs.addAll(event.getOutputs());
} }
for (ReadEntity readEntity : sortedHiveInputs) { for (ReadEntity readEntity : sortedHiveInputs) {
processHiveEntity(dgiBridge, event, readEntity, dataSets, source, entities); processHiveEntity(dgiBridge, event, readEntity, dataSets, source, entities);
} }
for (WriteEntity writeEntity : sortedHiveOutputs) { for (WriteEntity writeEntity : sortedHiveOutputs) {
processHiveEntity(dgiBridge, event, writeEntity, dataSets, target, entities); processHiveEntity(dgiBridge, event, writeEntity, dataSets, target, entities);
} }
if (source.size() > 0 || target.size() > 0) { if (source.size() > 0 || target.size() > 0) {
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, sortedHiveInputs, sortedHiveOutputs, source, target); Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, sortedHiveInputs, sortedHiveOutputs, source, target);
// setup Column Lineage // setup Column Lineage
List<Referenceable> sourceList = new ArrayList<>(source.values()); List<Referenceable> sourceList = new ArrayList<>(source.values());
List<Referenceable> targetList = new ArrayList<>(target.values()); List<Referenceable> targetList = new ArrayList<>(target.values());
List<Referenceable> colLineageProcessInstances = new ArrayList<>(); List<Referenceable> colLineageProcessInstances = new ArrayList<>();
try { try {
Map<String, Referenceable> columnQNameToRef = Map<String, Referenceable> columnQNameToRef =
ColumnLineageUtils.buildColumnReferenceableMap(sourceList, targetList); ColumnLineageUtils.buildColumnReferenceableMap(sourceList, targetList);
colLineageProcessInstances = createColumnLineageProcessInstances(processReferenceable, colLineageProcessInstances = createColumnLineageProcessInstances(processReferenceable,
event.lineageInfo, event.lineageInfo,
columnQNameToRef); columnQNameToRef);
}catch (Exception e){ } catch (Exception e) {
LOG.warn("Column lineage process setup failed with exception {}", e); LOG.warn("Column lineage process setup failed with exception {}", e);
}
colLineageProcessInstances.add(0, processReferenceable);
entities.addAll(colLineageProcessInstances);
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
} else {
LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr());
} }
colLineageProcessInstances.add(0, processReferenceable);
entities.addAll(colLineageProcessInstances);
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
} else { } else {
LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr()); LOG.info("Skipped query {} for processing since it is a select query ", event.getQueryStr());
} }
} else { }
LOG.info("Skipped query {} for processing since it is a select query ", event.getQueryStr()); catch(Exception e) {
throw new AtlasHookException("HiveHook.registerProcess() failed.", e);
} }
} }
private <T extends Entity> void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, T entity, Set<String> dataSetsProcessed, private <T extends Entity> void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, T entity, Set<String> dataSetsProcessed,
SortedMap<T, Referenceable> dataSets, Set<Referenceable> entities) throws Exception { SortedMap<T, Referenceable> dataSets, Set<Referenceable> entities) throws AtlasHookException {
if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) { try {
final String tblQFName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable()); if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) {
if (!dataSetsProcessed.contains(tblQFName)) { final String tblQFName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false); if (!dataSetsProcessed.contains(tblQFName)) {
dataSets.put(entity, result.get(Type.TABLE)); LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false);
dataSetsProcessed.add(tblQFName); dataSets.put(entity, result.get(Type.TABLE));
entities.addAll(result.values()); dataSetsProcessed.add(tblQFName);
} entities.addAll(result.values());
} else if (entity.getType() == Type.DFS_DIR) { }
URI location = entity.getLocation(); } else if (entity.getType() == Type.DFS_DIR) {
if(location != null) { URI location = entity.getLocation();
final String pathUri = lower(new Path(location).toString()); if (location != null) {
LOG.debug("Registering DFS Path {} ", pathUri); final String pathUri = lower(new Path(location).toString());
if (!dataSetsProcessed.contains(pathUri)) { LOG.debug("Registering DFS Path {} ", pathUri);
Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri); if (!dataSetsProcessed.contains(pathUri)) {
dataSets.put(entity, hdfsPath); Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
dataSetsProcessed.add(pathUri); dataSets.put(entity, hdfsPath);
entities.add(hdfsPath); dataSetsProcessed.add(pathUri);
entities.add(hdfsPath);
}
} }
} }
} }
catch(Exception e) {
throw new AtlasHookException("HiveHook.processHiveEntity() failed.", e);
}
} }
private boolean isSelectQuery(HiveEventContext event) { private boolean isSelectQuery(HiveEventContext event) {
......
...@@ -25,6 +25,7 @@ import org.apache.atlas.AtlasConstants; ...@@ -25,6 +25,7 @@ import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.sqoop.model.SqoopDataTypes; import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
...@@ -71,8 +72,7 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -71,8 +72,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml"); org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
} }
public Referenceable createHiveDatabaseInstance(String clusterName, String dbName) public Referenceable createHiveDatabaseInstance(String clusterName, String dbName) {
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(AtlasClient.NAME, dbName); dbRef.set(AtlasClient.NAME, dbName);
...@@ -82,14 +82,14 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -82,14 +82,14 @@ public class SqoopHook extends SqoopJobDataPublisher {
} }
public Referenceable createHiveTableInstance(String clusterName, Referenceable dbRef, public Referenceable createHiveTableInstance(String clusterName, Referenceable dbRef,
String tableName, String dbName) throws Exception { String tableName, String dbName) {
Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
tableRef.set(AtlasClient.NAME, tableName.toLowerCase()); tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
tableRef.set(HiveMetaStoreBridge.DB, dbRef); tableRef.set(HiveMetaStoreBridge.DB, dbRef);
return tableRef; return tableRef;
} }
private Referenceable createDBStoreInstance(SqoopJobDataPublisher.Data data) private Referenceable createDBStoreInstance(SqoopJobDataPublisher.Data data)
throws ImportException { throws ImportException {
...@@ -173,19 +173,24 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -173,19 +173,24 @@ public class SqoopHook extends SqoopJobDataPublisher {
} }
@Override @Override
public void publish(SqoopJobDataPublisher.Data data) throws Exception { public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
Configuration atlasProperties = ApplicationProperties.get(); try {
String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); Configuration atlasProperties = ApplicationProperties.get();
String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
Referenceable dbStoreRef = createDBStoreInstance(data);
Referenceable dbRef = createHiveDatabaseInstance(clusterName, data.getHiveDB()); Referenceable dbStoreRef = createDBStoreInstance(data);
Referenceable hiveTableRef = createHiveTableInstance(clusterName, dbRef, Referenceable dbRef = createHiveDatabaseInstance(clusterName, data.getHiveDB());
data.getHiveTable(), data.getHiveDB()); Referenceable hiveTableRef = createHiveTableInstance(clusterName, dbRef,
Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName); data.getHiveTable(), data.getHiveDB());
Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName);
int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
HookNotification.HookNotificationMessage message = int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef); HookNotification.HookNotificationMessage message =
AtlasHook.notifyEntities(Arrays.asList(message), maxRetries); new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef);
AtlasHook.notifyEntities(Arrays.asList(message), maxRetries);
}
catch(Exception e) {
throw new AtlasHookException("SqoopHook.publish() failed.", e);
}
} }
} }
...@@ -34,6 +34,7 @@ import java.util.Map; ...@@ -34,6 +34,7 @@ import java.util.Map;
public class StormAtlasHook implements ISubmitterHook { public class StormAtlasHook implements ISubmitterHook {
private static final Logger LOG = LoggerFactory.getLogger(StormAtlasHook.class); private static final Logger LOG = LoggerFactory.getLogger(StormAtlasHook.class);
private static final String ATLAS_PLUGIN_TYPE = "storm"; private static final String ATLAS_PLUGIN_TYPE = "storm";
private static final String ATLAS_STORM_HOOK_IMPL_CLASSNAME = "org.apache.atlas.storm.hook.StormAtlasHook"; private static final String ATLAS_STORM_HOOK_IMPL_CLASSNAME = "org.apache.atlas.storm.hook.StormAtlasHook";
......
...@@ -42,7 +42,7 @@ public final class StormTopologyUtil { ...@@ -42,7 +42,7 @@ public final class StormTopologyUtil {
private StormTopologyUtil() { private StormTopologyUtil() {
} }
public static Set<String> getTerminalUserBoltNames(StormTopology topology) throws Exception { public static Set<String> getTerminalUserBoltNames(StormTopology topology) {
Set<String> terminalBolts = new HashSet<>(); Set<String> terminalBolts = new HashSet<>();
Set<String> inputs = new HashSet<>(); Set<String> inputs = new HashSet<>();
for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) { for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
......
...@@ -67,9 +67,10 @@ public enum AtlasErrorCode { ...@@ -67,9 +67,10 @@ public enum AtlasErrorCode {
INSTANCE_LINEAGE_INVALID_PARAMS(400, "ATLAS-400-00-026", "Invalid lineage query parameters passed {0}: {1}"), INSTANCE_LINEAGE_INVALID_PARAMS(400, "ATLAS-400-00-026", "Invalid lineage query parameters passed {0}: {1}"),
ATTRIBUTE_UPDATE_NOT_SUPPORTED(400, "ATLAS-400-00-027", "{0}.{1} : attribute update not supported"), ATTRIBUTE_UPDATE_NOT_SUPPORTED(400, "ATLAS-400-00-027", "{0}.{1} : attribute update not supported"),
INVALID_VALUE(400, "ATLAS-400-00-028", "invalid value: {0}"), INVALID_VALUE(400, "ATLAS-400-00-028", "invalid value: {0}"),
BAD_REQUEST(400, "ATLAS-400-00-020", "{0}"), BAD_REQUEST(400, "ATLAS-400-00-029", "{0}"),
PARAMETER_PARSING_FAILED(400, "ATLAS-400-00-02A", "Parameter parsing failed at: {0}"),
// All Not found enums go here // All Not found enums go here
TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"), TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"),
TYPE_GUID_NOT_FOUND(404, "ATLAS-404-00-002", "Given type guid {0} was invalid"), TYPE_GUID_NOT_FOUND(404, "ATLAS-404-00-002", "Given type guid {0} was invalid"),
EMPTY_RESULTS(404, "ATLAS-404-00-004", "No result found for {0}"), EMPTY_RESULTS(404, "ATLAS-404-00-004", "No result found for {0}"),
...@@ -96,7 +97,15 @@ public enum AtlasErrorCode { ...@@ -96,7 +97,15 @@ public enum AtlasErrorCode {
NOTIFICATION_FAILED(500, "ATLAS-500-00-007", "Failed to notify for change {0}"), NOTIFICATION_FAILED(500, "ATLAS-500-00-007", "Failed to notify for change {0}"),
GREMLIN_GROOVY_SCRIPT_ENGINE_FAILED(500, "ATLAS-500-00-008", "scriptEngine cannot be initialized for: {0}"), GREMLIN_GROOVY_SCRIPT_ENGINE_FAILED(500, "ATLAS-500-00-008", "scriptEngine cannot be initialized for: {0}"),
JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED(500, "ATLAS-500-00-009", "ObjectMapper.readValue returned NULL for class: {0}"), JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED(500, "ATLAS-500-00-009", "ObjectMapper.readValue returned NULL for class: {0}"),
GREMLIN_SCRIPT_EXECUTION_FAILED(500, "ATLAS-500-00-00A", "Script execution failed for: {0}"); GREMLIN_SCRIPT_EXECUTION_FAILED(500, "ATLAS-500-00-00A", "Script execution failed for: {0}"),
CURATOR_FRAMEWORK_UPDATE(500, "ATLAS-500-00-00B", "ActiveInstanceState.update resulted in exception."),
QUICK_START(500, "ATLAS-500-00-00C", "Failed to run QuickStart: {0}"),
EMBEDDED_SERVER_START(500, "ATLAS-500-00-00D", "EmbeddedServer.Start: failed!"),
STORM_TOPOLOGY_UTIL(500, "ATLAS-500-00-00E", "StormToplogyUtil: {0}"),
SQOOP_HOOK(500, "ATLAS-500-00-00F", "SqoopHook: {0}"),
HIVE_HOOK(500, "ATLAS-500-00-010", "HiveHook: {0}"),
HIVE_HOOK_METASTORE_BRIDGE(500, "ATLAS-500-00-011", "HiveHookMetaStoreBridge: {0}");
private String errorCode; private String errorCode;
private String errorMessage; private String errorMessage;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hook;
/**
* Exception class for Atlas Hooks.
*/
public class AtlasHookException extends Exception {
public AtlasHookException() {
}
public AtlasHookException(String message) {
super(message);
}
public AtlasHookException(String message, Throwable cause) {
super(message, cause);
}
public AtlasHookException(Throwable cause) {
super(cause);
}
public AtlasHookException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
...@@ -24,7 +24,9 @@ import com.google.common.collect.ImmutableList; ...@@ -24,7 +24,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.TypesDef; import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
...@@ -323,39 +325,48 @@ public class QuickStart { ...@@ -323,39 +325,48 @@ public class QuickStart {
} }
Id database(String name, String description, String owner, String locationUri, String... traitNames) Id database(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception { throws AtlasBaseException {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); try {
referenceable.set("name", name); Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set("description", description); referenceable.set("name", name);
referenceable.set("owner", owner); referenceable.set("description", description);
referenceable.set("locationUri", locationUri); referenceable.set("owner", owner);
referenceable.set("createTime", System.currentTimeMillis()); referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());
return createInstance(referenceable);
return createInstance(referenceable);
} catch (Exception e) {
throw new AtlasBaseException(AtlasErrorCode.QUICK_START, e, String.format("%s database entity creation failed", name));
}
} }
Referenceable rawStorageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed) Referenceable rawStorageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed) {
throws Exception { Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE); referenceable.set("location", location);
referenceable.set("location", location); referenceable.set("inputFormat", inputFormat);
referenceable.set("inputFormat", inputFormat); referenceable.set("outputFormat", outputFormat);
referenceable.set("outputFormat", outputFormat); referenceable.set("compressed", compressed);
referenceable.set("compressed", compressed);
return referenceable; return referenceable;
} }
Referenceable rawColumn(String name, String dataType, String comment, String... traitNames) throws Exception { Referenceable rawColumn(String name, String dataType, String comment, String... traitNames) throws AtlasBaseException {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); try {
referenceable.set("name", name); Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set("dataType", dataType); referenceable.set("name", name);
referenceable.set("comment", comment); referenceable.set("dataType", dataType);
referenceable.set("comment", comment);
return referenceable; return referenceable;
}
catch(Exception e) {
throw new AtlasBaseException(AtlasErrorCode.QUICK_START, e, String.format("%s, column entity creation failed", name));
}
} }
Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType, Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
List<Referenceable> columns, String... traitNames) throws Exception { List<Referenceable> columns, String... traitNames) throws AtlasBaseException {
try {
Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames); Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames);
referenceable.set("name", name); referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
...@@ -370,46 +381,61 @@ public class QuickStart { ...@@ -370,46 +381,61 @@ public class QuickStart {
referenceable.set("columns", columns); referenceable.set("columns", columns);
return createInstance(referenceable); return createInstance(referenceable);
} catch (Exception e) {
throw new AtlasBaseException(AtlasErrorCode.QUICK_START, e, String.format("%s table entity creation failed", name));
}
} }
Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables, Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames) String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
throws Exception { throws AtlasBaseException {
Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames); try {
// super type attributes Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames);
referenceable.set(AtlasClient.NAME, name); // super type attributes
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set(AtlasClient.NAME, name);
referenceable.set("description", description); referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set(INPUTS_ATTRIBUTE, inputTables); referenceable.set("description", description);
referenceable.set(OUTPUTS_ATTRIBUTE, outputTables); referenceable.set(INPUTS_ATTRIBUTE, inputTables);
referenceable.set(OUTPUTS_ATTRIBUTE, outputTables);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis()); referenceable.set("user", user);
referenceable.set("endTime", System.currentTimeMillis() + 10000); referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan); referenceable.set("queryText", queryText);
referenceable.set("queryId", queryId); referenceable.set("queryPlan", queryPlan);
referenceable.set("queryGraph", queryGraph); referenceable.set("queryId", queryId);
referenceable.set("queryGraph", queryGraph);
return createInstance(referenceable);
return createInstance(referenceable);
} catch (Exception e) {
throw new AtlasBaseException(AtlasErrorCode.QUICK_START, e, String.format("%s process entity creation failed", name));
}
} }
Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception { Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws AtlasBaseException {
Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames); try {
referenceable.set("name", name); Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); referenceable.set("name", name);
referenceable.set("db", dbId); referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("db", dbId);
referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables); referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables);
return createInstance(referenceable); return createInstance(referenceable);
} catch (Exception e) {
throw new AtlasBaseException(AtlasErrorCode.QUICK_START, e, String.format("%s Id creation", name));
}
} }
private void verifyTypesCreated() throws Exception { private void verifyTypesCreated() throws AtlasBaseException {
List<String> types = metadataServiceClient.listTypes(); try {
for (String type : TYPES) { List<String> types = metadataServiceClient.listTypes();
assert types.contains(type); for (String type : TYPES) {
assert types.contains(type);
}
} catch (Exception e) {
throw new AtlasBaseException(AtlasErrorCode.QUICK_START, e, "view creation failed.");
} }
} }
...@@ -461,14 +487,18 @@ public class QuickStart { ...@@ -461,14 +487,18 @@ public class QuickStart {
"from DataSet", "from Process",}; "from DataSet", "from Process",};
} }
private void search() throws Exception { private void search() throws AtlasBaseException {
for (String dslQuery : getDSLQueries()) { try {
JSONArray results = metadataServiceClient.search(dslQuery, 10, 0); for (String dslQuery : getDSLQueries()) {
if (results != null) { JSONArray results = metadataServiceClient.search(dslQuery, 10, 0);
System.out.println("query [" + dslQuery + "] returned [" + results.length() + "] rows"); if (results != null) {
} else { System.out.println("query [" + dslQuery + "] returned [" + results.length() + "] rows");
System.out.println("query [" + dslQuery + "] failed, results:" + results); } else {
System.out.println("query [" + dslQuery + "] failed, results:" + results);
}
} }
} catch (Exception e) {
throw new AtlasBaseException(AtlasErrorCode.QUICK_START, e, "one or more dsl queries failed");
} }
} }
} }
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package org.apache.atlas.web.params; package org.apache.atlas.web.params;
import org.apache.atlas.exception.AtlasBaseException;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
...@@ -100,7 +102,7 @@ public abstract class AbstractParam<T> { ...@@ -100,7 +102,7 @@ public abstract class AbstractParam<T> {
* @return {@code input}, parsed as an instance of {@code T} * @return {@code input}, parsed as an instance of {@code T}
* @throws Exception if there is an error parsing the input * @throws Exception if there is an error parsing the input
*/ */
protected abstract T parse(String input) throws Exception; protected abstract T parse(String input) throws AtlasBaseException;
/** /**
* Returns the underlying value. * Returns the underlying value.
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
package org.apache.atlas.web.params; package org.apache.atlas.web.params;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
/** /**
* A parameter encapsulating boolean values. If the query parameter value is {@code "true"}, * A parameter encapsulating boolean values. If the query parameter value is {@code "true"},
* regardless of case, the returned value is {@link Boolean#TRUE}. If the query parameter value is * regardless of case, the returned value is {@link Boolean#TRUE}. If the query parameter value is
...@@ -36,13 +39,13 @@ public class BooleanParam extends AbstractParam<Boolean> { ...@@ -36,13 +39,13 @@ public class BooleanParam extends AbstractParam<Boolean> {
} }
@Override @Override
protected Boolean parse(String input) throws Exception { protected Boolean parse(String input) throws AtlasBaseException {
if ("true".equalsIgnoreCase(input)) { if ("true".equalsIgnoreCase(input)) {
return Boolean.TRUE; return Boolean.TRUE;
} }
if ("false".equalsIgnoreCase(input)) { if ("false".equalsIgnoreCase(input)) {
return Boolean.FALSE; return Boolean.FALSE;
} }
throw new Exception(); throw new AtlasBaseException(AtlasErrorCode.PARAMETER_PARSING_FAILED, "Boolean.parse: input=" + input);
} }
} }
\ No newline at end of file
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.web.params; package org.apache.atlas.web.params;
import org.apache.atlas.exception.AtlasBaseException;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
...@@ -32,7 +33,7 @@ public class DateTimeParam extends AbstractParam<DateTime> { ...@@ -32,7 +33,7 @@ public class DateTimeParam extends AbstractParam<DateTime> {
} }
@Override @Override
protected DateTime parse(String input) throws Exception { protected DateTime parse(String input) throws AtlasBaseException {
return new DateTime(input, DateTimeZone.UTC); return new DateTime(input, DateTimeZone.UTC);
} }
} }
\ No newline at end of file
...@@ -18,15 +18,11 @@ ...@@ -18,15 +18,11 @@
package org.apache.atlas.web.security; package org.apache.atlas.web.security;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.web.model.User; import org.apache.atlas.web.model.User;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter; import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.ldap.core.support.LdapContextSource; import org.springframework.ldap.core.support.LdapContextSource;
...@@ -41,7 +37,10 @@ import org.springframework.security.ldap.authentication.LdapAuthenticationProvid ...@@ -41,7 +37,10 @@ import org.springframework.security.ldap.authentication.LdapAuthenticationProvid
import org.springframework.security.ldap.search.FilterBasedLdapUserSearch; import org.springframework.security.ldap.search.FilterBasedLdapUserSearch;
import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator; import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.apache.commons.lang.StringUtils;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Properties;
@Component @Component
public class AtlasLdapAuthenticationProvider extends public class AtlasLdapAuthenticationProvider extends
...@@ -87,7 +86,7 @@ public class AtlasLdapAuthenticationProvider extends ...@@ -87,7 +86,7 @@ public class AtlasLdapAuthenticationProvider extends
} }
private Authentication getLdapBindAuthentication( private Authentication getLdapBindAuthentication(
Authentication authentication) throws Exception { Authentication authentication) {
try { try {
if (isDebugEnabled) { if (isDebugEnabled) {
LOG.debug("==> AtlasLdapAuthenticationProvider getLdapBindAuthentication"); LOG.debug("==> AtlasLdapAuthenticationProvider getLdapBindAuthentication");
......
...@@ -20,7 +20,9 @@ package org.apache.atlas.web.service; ...@@ -20,7 +20,9 @@ package org.apache.atlas.web.service;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
...@@ -81,23 +83,27 @@ public class ActiveInstanceState { ...@@ -81,23 +83,27 @@ public class ActiveInstanceState {
* @throws Exception * @throws Exception
* @param serverId ID of this server instance * @param serverId ID of this server instance
*/ */
public void update(String serverId) throws Exception { public void update(String serverId) throws AtlasBaseException {
CuratorFramework client = curatorFactory.clientInstance(); try {
String atlasServerAddress = HAConfiguration.getBoundAddressForId(configuration, serverId); CuratorFramework client = curatorFactory.clientInstance();
HAConfiguration.ZookeeperProperties zookeeperProperties = HAConfiguration.ZookeeperProperties zookeeperProperties =
HAConfiguration.getZookeeperProperties(configuration); HAConfiguration.getZookeeperProperties(configuration);
List<ACL> acls = Arrays.asList( String atlasServerAddress = HAConfiguration.getBoundAddressForId(configuration, serverId);
new ACL[]{AtlasZookeeperSecurityProperties.parseAcl(zookeeperProperties.getAcl(), List<ACL> acls = Arrays.asList(
ZooDefs.Ids.OPEN_ACL_UNSAFE.get(0))}); new ACL[]{AtlasZookeeperSecurityProperties.parseAcl(zookeeperProperties.getAcl(),
Stat serverInfo = client.checkExists().forPath(getZnodePath(zookeeperProperties)); ZooDefs.Ids.OPEN_ACL_UNSAFE.get(0))});
if (serverInfo == null) { Stat serverInfo = client.checkExists().forPath(getZnodePath(zookeeperProperties));
client.create(). if (serverInfo == null) {
withMode(CreateMode.EPHEMERAL). client.create().
withACL(acls). withMode(CreateMode.EPHEMERAL).
forPath(getZnodePath(zookeeperProperties)); withACL(acls).
forPath(getZnodePath(zookeeperProperties));
}
client.setData().forPath(getZnodePath(zookeeperProperties),
atlasServerAddress.getBytes(Charset.forName("UTF-8")));
} catch (Exception e) {
throw new AtlasBaseException(AtlasErrorCode.CURATOR_FRAMEWORK_UPDATE, e, "forPath: getZnodePath");
} }
client.setData().forPath(getZnodePath(zookeeperProperties),
atlasServerAddress.getBytes(Charset.forName("UTF-8")));
} }
private String getZnodePath(HAConfiguration.ZookeeperProperties zookeeperProperties) { private String getZnodePath(HAConfiguration.ZookeeperProperties zookeeperProperties) {
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package org.apache.atlas.web.service; package org.apache.atlas.web.service;
import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
...@@ -88,9 +90,13 @@ public class EmbeddedServer { ...@@ -88,9 +90,13 @@ public class EmbeddedServer {
return connector; return connector;
} }
public void start() throws Exception { public void start() throws AtlasBaseException {
server.start(); try {
server.join(); server.start();
server.join();
} catch(Exception e) {
throw new AtlasBaseException(AtlasErrorCode.EMBEDDED_SERVER_START, e);
}
} }
public void stop() { public void stop() {
......
...@@ -119,11 +119,15 @@ public class SetupSteps { ...@@ -119,11 +119,15 @@ public class SetupSteps {
} }
private void clearSetupInProgress(HAConfiguration.ZookeeperProperties zookeeperProperties) private void clearSetupInProgress(HAConfiguration.ZookeeperProperties zookeeperProperties)
throws Exception { throws SetupException {
CuratorFramework client = curatorFactory.clientInstance(); CuratorFramework client = curatorFactory.clientInstance();
String path = lockPath(zookeeperProperties); String path = lockPath(zookeeperProperties);
client.delete().forPath(path); try {
LOG.info("Deleted lock path after completing setup {}", path); client.delete().forPath(path);
LOG.info("Deleted lock path after completing setup {}", path);
} catch (Exception e) {
throw new SetupException(String.format("SetupSteps.clearSetupInProgress: Failed to get Zookeeper node patH: %s", path), e);
}
} }
private String lockPath(HAConfiguration.ZookeeperProperties zookeeperProperties) { private String lockPath(HAConfiguration.ZookeeperProperties zookeeperProperties) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment