Commit 5a0be805 by Suma Shivaprasad

ATLAS-525 Drop support for partitions, select query lineage, roles, principals,…

ATLAS-525 Drop support for partitions, select query lineage, roles, principals, resource, hive_type...(sumasai via shwethags)
parent faad323e
......@@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
......@@ -262,7 +263,7 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
private Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName);
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
......
......@@ -22,6 +22,7 @@ import com.google.common.base.Joiner;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
......@@ -29,7 +30,6 @@ import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
......@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONArray;
......@@ -163,7 +162,7 @@ public class HiveMetaStoreBridge {
String dbName = hiveDB.getName().toLowerCase();
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName));
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName);
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription());
dbRef.set("locationUri", hiveDB.getLocationUri());
dbRef.set(HiveDataModelGenerator.PARAMETERS, hiveDB.getParameters());
......@@ -209,7 +208,7 @@ public class HiveMetaStoreBridge {
static String getDatabaseDSLQuery(String clusterName, String databaseName, String typeName) {
return String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME,
databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, clusterName);
databaseName.toLowerCase(), AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
}
private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception {
......@@ -251,10 +250,6 @@ public class HiveMetaStoreBridge {
for (String tableName : hiveTables) {
Table table = hiveClient.getTable(databaseName, tableName);
Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
// Import Partitions
Referenceable sdReferenceable = getSDForTable(databaseName, tableName);
registerPartitions(tableReferenceable, sdReferenceable, table);
}
}
......@@ -387,35 +382,6 @@ public class HiveMetaStoreBridge {
return new Referenceable(guid, typeName, null);
}
private Referenceable getPartitionReference(String dbName, String tableName, List<String> values) throws Exception {
String valuesStr = joinPartitionValues(values);
LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr);
//todo replace gremlin with DSL
// String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', "
// + "dbName where name = '%s' and clusterName = '%s' select p", typeName, valuesStr,
// tableName,
// dbName, clusterName);
String tableEntityName = getTableQualifiedName(clusterName, dbName, tableName);
String gremlinQuery = getPartitionGremlinQuery(valuesStr, tableEntityName);
return getEntityReferenceFromGremlin(HiveDataTypes.HIVE_PARTITION.getName(), gremlinQuery);
}
static String joinPartitionValues(List<String> values) {
return "['" + StringUtils.join(values, "', '") + "']";
}
static String getPartitionGremlinQuery(String valuesStr, String tableEntityName) {
String typeName = HiveDataTypes.HIVE_PARTITION.getName();
String datasetType = AtlasClient.DATA_SET_SUPER_TYPE;
return String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')."
+ "out('__%s.table').has('%s.name', '%s').back('p').toList()", typeName, typeName, valuesStr,
typeName, datasetType, tableEntityName);
}
private Referenceable getSDForTable(String dbName, String tableName) throws Exception {
Referenceable tableRef = getTableReference(dbName, tableName);
if (tableRef == null) {
......@@ -428,85 +394,6 @@ public class HiveMetaStoreBridge {
return new Referenceable(sd.getId().id, sd.getTypeName(), null);
}
private void registerPartitions(Referenceable tableReferenceable, Referenceable sdReferenceable,
Table table) throws Exception {
String dbName = table.getDbName();
String tableName = table.getTableName();
LOG.info("Registering partitions for {}.{}", dbName, tableName);
List<Partition> tableParts = hiveClient.getPartitions(table);
for (Partition hivePart : tableParts) {
if (hivePart.getValues() != null && hivePart.getValues().size() > 0) {
registerPartition(tableReferenceable, sdReferenceable, hivePart);
} else {
LOG.info("Skipping partition for table {} since partition values are {}", getTableQualifiedName(clusterName, table.getDbName(), table.getTableName()), StringUtils.join(hivePart.getValues(), ","));
}
}
}
private Referenceable registerPartition(Referenceable tableReferenceable, Referenceable sdReferenceable,
Partition hivePart) throws Exception {
LOG.info("Registering partition for {} with values {}", tableReferenceable,
StringUtils.join(hivePart.getValues(), ","));
String dbName = hivePart.getTable().getDbName();
String tableName = hivePart.getTable().getTableName();
Referenceable partRef = getPartitionReference(dbName, tableName, hivePart.getValues());
if (partRef == null) {
partRef = createPartitionReferenceable(tableReferenceable, sdReferenceable, hivePart);
partRef = registerInstance(partRef);
} else {
LOG.info("Partition {}.{} with values {} is already registered with id {}. Updating entity",
dbName, tableName,
StringUtils.join(hivePart.getValues(), ","), partRef.getId().id);
partRef =
createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, partRef);
updateInstance(partRef);
}
return partRef;
}
private Referenceable createOrUpdatePartitionReferenceable(Referenceable tableReferenceable,
Referenceable sdReferenceable,
Partition hivePart, Referenceable partRef) {
if (partRef == null) {
partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName());
}
partRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getPartitionQualifiedName(hivePart));
partRef.set("values", hivePart.getValues());
partRef.set(HiveDataModelGenerator.TABLE, tableReferenceable);
//todo fix
partRef.set("createTime", hivePart.getLastAccessTime());
partRef.set(LAST_ACCESS_TIME_ATTR, hivePart.getLastAccessTime());
// sdStruct = fillStorageDescStruct(hivePart.getSd());
// Instead of creating copies of the sdstruct for partitions we are reusing existing
// ones will fix to identify partitions with differing schema.
partRef.set("sd", sdReferenceable);
partRef.set(HiveDataModelGenerator.PARAMETERS, hivePart.getParameters());
return partRef;
}
/**
* Create a Hive partition instance in Atlas
* @param tableReferenceable The Hive Table {@link Referenceable} to which this partition belongs.
* @param sdReferenceable The Storage descriptor {@link Referenceable} for this table.
* @param hivePart The Hive {@link Partition} object being created
* @return Newly created Hive partition instance
*/
public Referenceable createPartitionReferenceable(Referenceable tableReferenceable, Referenceable sdReferenceable,
Partition hivePart) {
return createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, null);
}
private String getPartitionQualifiedName(Partition partition) {
return String.format("%s.%s.%s@%s", partition.getTable().getDbName(),
partition.getTable().getTableName(), StringUtils.join(partition.getValues(), "-"), clusterName);
}
public Referenceable fillStorageDescStruct(StorageDescriptor storageDesc, String tableQualifiedName,
String sdQualifiedName) throws Exception {
LOG.debug("Filling storage descriptor information for " + storageDesc);
......
......@@ -48,6 +48,8 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -262,7 +264,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
assert event.outputs != null && event.outputs.size() > 0;
for (WriteEntity writeEntity : event.outputs) {
//Below check should filter out partition related
//Below check should filter out partition related ddls
if (writeEntity.getType() == Entity.Type.TABLE) {
//Create/update table entity
createOrUpdateEntities(dgiBridge, event.user, writeEntity);
......@@ -313,20 +315,20 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
List<Referenceable> entities = new ArrayList<>();
switch (entity.getType()) {
case DATABASE:
db = entity.getDatabase();
break;
case TABLE:
table = entity.getTable();
db = dgiBridge.hiveClient.getDatabase(table.getDbName());
break;
case PARTITION:
partition = entity.getPartition();
table = partition.getTable();
db = dgiBridge.hiveClient.getDatabase(table.getDbName());
break;
case DATABASE:
db = entity.getDatabase();
break;
case TABLE:
table = entity.getTable();
db = dgiBridge.hiveClient.getDatabase(table.getDbName());
break;
case PARTITION:
partition = entity.getPartition();
table = partition.getTable();
db = dgiBridge.hiveClient.getDatabase(table.getDbName());
break;
}
db = dgiBridge.hiveClient.getDatabase(db.getName());
......@@ -340,12 +342,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
entities.add(tableEntity);
}
if (partition != null) {
Referenceable partitionEntity = dgiBridge.createPartitionReferenceable(tableEntity,
(Referenceable) tableEntity.get("sd"), partition);
entities.add(partitionEntity);
}
messages.add(new HookNotification.EntityUpdateRequest(user, entities));
return tableEntity;
}
......@@ -372,50 +368,70 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//Even explain CTAS has operation name as CREATETABLE_AS_SELECT
if (inputs.isEmpty() && outputs.isEmpty()) {
LOG.info("Explain statement. Skipping...");
return;
}
if (event.queryId == null) {
LOG.info("Query plan is missing. Skipping...");
LOG.info("Query id/plan is missing for {}" , event.queryStr);
}
String queryStr = normalize(event.queryStr);
LOG.debug("Registering query: {}", queryStr);
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("name", queryStr);
processReferenceable.set("operationType", event.operation.getOperationName());
processReferenceable.set("startTime", event.queryStartTime);
processReferenceable.set("userName", event.user);
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity);
source.add(inTable);
Map<String, Referenceable> source = new LinkedHashMap<>();
Map<String, Referenceable> target = new LinkedHashMap<>();
boolean isSelectQuery = isSelectQuery(event);
// Also filter out select queries which do not modify data
if (!isSelectQuery) {
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),readEntity.getTable().getDbName(), readEntity.getTable().getTableName());
if (!source.containsKey(tblQFName)) {
Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity);
source.put(tblQFName, inTable);
}
}
}
}
processReferenceable.set("inputs", source);
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
target.add(outTable);
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), writeEntity.getTable().getDbName(), writeEntity.getTable().getTableName());
if (!target.containsKey(tblQFName)) {
target.put(tblQFName, outTable);
}
}
}
if (source.size() > 0 || target.size() > 0) {
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
List<Referenceable> sourceList = new ArrayList<>(source.values());
List<Referenceable> targetList = new ArrayList<>(target.values());
//The serialization code expected a list
processReferenceable.set("inputs", sourceList);
processReferenceable.set("outputs", targetList);
processReferenceable.set("name", queryStr);
processReferenceable.set("operationType", event.operation.getOperationName());
processReferenceable.set("startTime", event.queryStartTime);
processReferenceable.set("userName", event.user);
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", event.queryId);
processReferenceable.set("queryPlan", event.jsonPlan.toString());
processReferenceable.set("endTime", System.currentTimeMillis());
//TODO set queryGraph
messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable));
} else {
LOG.info("Skipped query {} since it has no inputs or resulting outputs", queryStr);
}
} else {
LOG.info("Skipped query {} for processing since it is a select query ", queryStr);
}
processReferenceable.set("outputs", target);
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", event.queryId);
processReferenceable.set("queryPlan", event.jsonPlan.toString());
processReferenceable.set("endTime", System.currentTimeMillis());
//TODO set
processReferenceable.set("queryGraph", "queryGraph");
messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable));
}
private JSONObject getQueryPlan(HiveConf hiveConf, QueryPlan queryPlan) throws Exception {
try {
ExplainTask explain = new ExplainTask();
......@@ -427,4 +443,26 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return new JSONObject();
}
}
private boolean isSelectQuery(HiveEvent event) {
if (event.operation == HiveOperation.QUERY) {
Set<WriteEntity> outputs = event.outputs;
//Select query has only one output
if (outputs.size() == 1) {
WriteEntity output = outputs.iterator().next();
/* Strangely select queries have DFS_DIR as the type which seems like a bug in hive. Filter out by checking if the path is a temporary URI
* Insert into/overwrite queries onto local or dfs paths have DFS_DIR or LOCAL_DIR as the type and WriteType.PATH_WRITE and tempUri = false
* Insert into a temporary table has isTempURI = false. So will not skip as expected
*/
if (output.getType() == Type.DFS_DIR || output.getType() == Type.LOCAL_DIR) {
if (output.getWriteType() == WriteEntity.WriteType.PATH_WRITE &&
output.isTempURI()) {
return true;
}
}
}
}
return false;
}
}
......@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
import org.apache.atlas.addons.ModelDefinitionDump;
import org.apache.atlas.typesystem.TypesDef;
......@@ -69,7 +70,6 @@ public class HiveDataModelGenerator {
public static final String NAME = "name";
public static final String TABLE_NAME = "tableName";
public static final String CLUSTER_NAME = "clusterName";
public static final String TABLE = "table";
public static final String DB = "db";
......@@ -88,24 +88,16 @@ public class HiveDataModelGenerator {
LOG.info("Generating the Hive Data Model....");
// enums
createHiveObjectTypeEnum();
createHivePrincipalTypeEnum();
createResourceTypeEnum();
// structs
createSerDeStruct();
//createSkewedInfoStruct();
createOrderStruct();
createResourceUriStruct();
createStorageDescClass();
// classes
createDBClass();
createTypeClass();
createColumnClass();
createPartitionClass();
createTableClass();
createRoleClass();
// DDL/DML Process
createProcessClass();
......@@ -136,15 +128,6 @@ public class HiveDataModelGenerator {
return ImmutableList.of();
}
private void createHiveObjectTypeEnum() throws AtlasException {
EnumValue values[] = {new EnumValue("GLOBAL", 1), new EnumValue("DATABASE", 2), new EnumValue("TABLE", 3),
new EnumValue("PARTITION", 4), new EnumValue("COLUMN", 5),};
EnumTypeDefinition definition = new EnumTypeDefinition(HiveDataTypes.HIVE_OBJECT_TYPE.getName(), values);
enumTypeDefinitionMap.put(HiveDataTypes.HIVE_OBJECT_TYPE.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_OBJECT_TYPE.getName());
}
private void createHivePrincipalTypeEnum() throws AtlasException {
EnumValue values[] = {new EnumValue("USER", 1), new EnumValue("ROLE", 2), new EnumValue("GROUP", 3),};
......@@ -154,13 +137,6 @@ public class HiveDataModelGenerator {
LOG.debug("Created definition for " + HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName());
}
private void createResourceTypeEnum() throws AtlasException {
EnumValue values[] = {new EnumValue("JAR", 1), new EnumValue("FILE", 2), new EnumValue("ARCHIVE", 3),};
EnumTypeDefinition definition = new EnumTypeDefinition(HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), values);
enumTypeDefinitionMap.put(HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_RESOURCE_TYPE.getName());
}
private void createSerDeStruct() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
......@@ -217,21 +193,10 @@ public class HiveDataModelGenerator {
/** Revisit later after nested array types are handled by the typesystem **/
private void createResourceUriStruct() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("resourceType", HiveDataTypes.HIVE_RESOURCE_TYPE.getName(),
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("uri", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),};
StructTypeDefinition definition =
new StructTypeDefinition(HiveDataTypes.HIVE_RESOURCEURI.getName(), attributeDefinitions);
structTypeDefinitionMap.put(HiveDataTypes.HIVE_RESOURCEURI.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_RESOURCEURI.getName());
}
private void createDBClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(CLUSTER_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
new AttributeDefinition(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
......@@ -250,21 +215,6 @@ public class HiveDataModelGenerator {
LOG.debug("Created definition for " + HiveDataTypes.HIVE_DB.getName());
}
private void createTypeClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("type1", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("type2", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("fields", String.format("array<%s>", HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, false, null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TYPE.getName(), null, null,
attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_TYPE.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_TYPE.getName());
}
private void createColumnClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
......@@ -277,27 +227,6 @@ public class HiveDataModelGenerator {
LOG.debug("Created definition for " + HiveDataTypes.HIVE_COLUMN.getName());
}
private void createPartitionClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(TABLE, HiveDataTypes.HIVE_TABLE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition(STORAGE_DESC, HiveDataTypes.HIVE_STORAGEDESC.getName(), Multiplicity.REQUIRED, true,
null),
new AttributeDefinition("columns", DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.OPTIONAL, true, null),
new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_PARTITION.getName(), null,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_PARTITION.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_PARTITION.getName());
}
private void createTableClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TABLE_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
......@@ -332,22 +261,6 @@ public class HiveDataModelGenerator {
LOG.debug("Created definition for " + HiveDataTypes.HIVE_TABLE.getName());
}
private void createRoleClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("roleName", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(OWNER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_ROLE.getName(), null, null,
attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_ROLE.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_ROLE.getName());
}
private void createProcessClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("startTime", DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false, null),
......
......@@ -129,42 +129,6 @@ public class HiveMetaStoreBridgeTest {
}
@Test
public void testImportThatUpdatesRegisteredPartition() throws Exception {
setupDB(hiveClient, TEST_DB_NAME);
Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME,
TEST_TABLE_NAME,
HiveDataTypes.HIVE_TABLE.getName()))).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
Partition partition = mock(Partition.class);
when(partition.getTable()).thenReturn(hiveTable);
List partitionValues = Arrays.asList(new String[]{"name", "location"});
when(partition.getValues()).thenReturn(partitionValues);
int lastAccessTime = 1234512345;
when(partition.getLastAccessTime()).thenReturn(lastAccessTime);
when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new Partition[]{partition}));
when(atlasClient.searchByGremlin(
HiveMetaStoreBridge.getPartitionGremlinQuery(
HiveMetaStoreBridge.joinPartitionValues(partitionValues),
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))).
thenReturn(getPartitionReference("9ae06b34-9151-3043-aa9d-b82103a50e99"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
bridge.importHiveMetadata();
verify(atlasClient).updateEntity(eq("9ae06b34-9151-3043-aa9d-b82103a50e99"),
(Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.LAST_ACCESS_TIME_ATTR,
new Integer(lastAccessTime))));
}
@Test
public void testImportWhenPartitionKeysAreNull() throws Exception {
setupDB(hiveClient, TEST_DB_NAME);
Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
......@@ -192,14 +156,6 @@ public class HiveMetaStoreBridgeTest {
}
}
private JSONArray getPartitionReference(String id) throws JSONException {
JSONObject resultEntry = new JSONObject();
resultEntry.put(HiveMetaStoreBridge.SEARCH_ENTRY_GUID_ATTR, id);
JSONArray results = new JSONArray();
results.put(resultEntry);
return results;
}
private JSONArray getEntityReference(String id) throws JSONException {
return new JSONArray(String.format("[{\"$id$\":{\"id\":\"%s\"}}]", id));
}
......
......@@ -62,6 +62,11 @@ public class HiveHookIT {
private AtlasClient dgiCLient;
private SessionState ss;
private enum QUERY_TYPE {
GREMLIN,
DSL
}
@BeforeClass
public void setUp() throws Exception {
//Set-up hive session
......@@ -130,12 +135,19 @@ public class HiveHookIT {
}
private String createTable() throws Exception {
return createTable(true);
return createTable(false);
}
private String createTable(boolean isPartitioned) throws Exception {
String tableName = tableName();
runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ?
" partitioned by(dt string)" : ""));
return tableName;
}
private String createTable(boolean partition) throws Exception {
private String createTable(boolean isPartitioned, boolean isTemporary) throws Exception {
String tableName = tableName();
runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (partition ?
runCommand("create " + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ?
" partitioned by(dt string)" : ""));
return tableName;
}
......@@ -181,7 +193,7 @@ public class HiveHookIT {
LOG.debug("Searching for column {}", colName);
String query =
String.format("%s where qualifiedName = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase());
assertEntityIsNotRegistered(query);
assertEntityIsNotRegistered(QUERY_TYPE.DSL, query);
}
@Test
......@@ -266,19 +278,99 @@ public class HiveHookIT {
}
@Test
public void testInsert() throws Exception {
public void testLoadDataIntoPartition() throws Exception {
String tableName = createTable(true);
String loadFile = file("load");
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertNull(process.get("inputs"));
System.out.println(" Ref Ops : " + process.get("outputs"));
Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
}
@Test
public void testInsertIntoTable() throws Exception {
String tableName = createTable();
String insertTableName = createTable();
String query =
"insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
+ " where dt = '2015-01-01'";
"insert into " + insertTableName + " select id, name from " + tableName;
runCommand(query);
assertProcessIsRegistered(query);
String partId = assertPartitionIsRegistered(DEFAULT_DB, insertTableName, "2015-01-01");
Referenceable partitionEntity = dgiCLient.getEntity(partId);
Assert.assertEquals(partitionEntity.get("qualifiedName"),
String.format("%s.%s.%s@%s", "default", insertTableName.toLowerCase(), "2015-01-01", CLUSTER_NAME));
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
}
@Test
public void testInsertIntoLocalDir() throws Exception {
String tableName = createTable();
File randomLocalPath = File.createTempFile("hiverandom", ".tmp");
String query =
"insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertNull(process.get("outputs"));
assertTableIsRegistered(DEFAULT_DB, tableName);
}
@Test
public void testInsertIntoDFSDir() throws Exception {
String tableName = createTable();
String pFile = "pfile://" + mkdir("somedfspath");
String query =
"insert overwrite DIRECTORY '" + pFile + "' select id, name from " + tableName;
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertNull(process.get("outputs"));
assertTableIsRegistered(DEFAULT_DB, tableName);
}
@Test
public void testInsertIntoTempTable() throws Exception {
String tableName = createTable();
String insertTableName = createTable(false, true);
String query =
"insert into " + insertTableName + " select id, name from " + tableName;
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
}
@Test
public void testInsertIntoPartition() throws Exception {
String tableName = createTable(true);
String insertTableName = createTable(true);
String query =
"insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
+ " where dt = '2015-01-01'";
runCommand(query);
String processId = assertProcessIsRegistered(query);
Referenceable process = dgiCLient.getEntity(processId);
Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
}
private String random() {
......@@ -316,18 +408,16 @@ public class HiveHookIT {
}
@Test
public void testSelect() throws Exception {
public void testIgnoreSelect() throws Exception {
String tableName = createTable();
String query = "select * from " + tableName;
runCommand(query);
String pid = assertProcessIsRegistered(query);
Referenceable processEntity = dgiCLient.getEntity(pid);
Assert.assertEquals(processEntity.get("name"), query.toLowerCase());
assertProcessIsNotRegistered(query);
//single entity per query
//check with uppercase table name
query = "SELECT * from " + tableName.toUpperCase();
runCommand(query);
assertProcessIsRegistered(query);
assertProcessIsNotRegistered(query);
}
@Test
......@@ -727,6 +817,18 @@ public class HiveHookIT {
return assertEntityIsRegistered(gremlinQuery);
}
private void assertProcessIsNotRegistered(String queryStr) throws Exception {
// String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(),
// normalize(queryStr));
// assertEntityIsRegistered(dslQuery, true);
//todo replace with DSL
String typeName = HiveDataTypes.HIVE_PROCESS.getName();
String gremlinQuery =
String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", typeName, typeName,
normalize(queryStr));
assertEntityIsNotRegistered(QUERY_TYPE.GREMLIN, gremlinQuery);
}
private String normalize(String str) {
if (StringUtils.isEmpty(str)) {
return null;
......@@ -739,7 +841,7 @@ public class HiveHookIT {
String query = String.format(
"%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
assertEntityIsNotRegistered(query);
assertEntityIsNotRegistered(QUERY_TYPE.DSL, query);
}
private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
......@@ -765,17 +867,6 @@ public class HiveHookIT {
return assertEntityIsRegistered(query);
}
private String assertPartitionIsRegistered(String dbName, String tableName, String value) throws Exception {
String typeName = HiveDataTypes.HIVE_PARTITION.getName();
LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value);
String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', "
+ "db where name = '%s' and clusterName = '%s' select p", typeName, value,
tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
return assertEntityIsRegistered(dslQuery, "p");
}
private String assertEntityIsRegistered(final String query, String... arg) throws Exception {
waitFor(60000, new Predicate() {
@Override
......@@ -798,8 +889,16 @@ public class HiveHookIT {
}
}
private void assertEntityIsNotRegistered(String dslQuery) throws Exception {
JSONArray results = dgiCLient.searchByDSL(dslQuery);
private void assertEntityIsNotRegistered(QUERY_TYPE queryType, String query) throws Exception {
JSONArray results = null;
switch(queryType) {
case DSL :
results = dgiCLient.searchByDSL(query);
break;
case GREMLIN :
results = dgiCLient.searchByGremlin(query);
break;
}
Assert.assertEquals(results.length(), 0);
}
......
......@@ -21,6 +21,7 @@ package org.apache.atlas.sqoop.hook;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
......@@ -61,7 +62,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
public Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName);
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
......
......@@ -215,7 +215,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
? config.get("HdfsBolt.fileNameFormat.path")
: config.get("HdfsBolt.rotationActions");
final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
dataSetReferenceable.set(HiveDataModelGenerator.CLUSTER_NAME, getClusterName(stormConf));
dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr);
dataSetReferenceable.set("path", hdfsPathStr);
dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal"));
......@@ -230,7 +230,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
dbReferenceable.set(HiveDataModelGenerator.NAME, databaseName);
dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName));
dbReferenceable.set(HiveDataModelGenerator.CLUSTER_NAME, getClusterName(stormConf));
dbReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
dependentEntities.add(dbReferenceable);
clusterName = extractComponentClusterName(new HiveConf(), stormConf);
final String hiveTableName = config.get("HiveBolt.options.tableName");
......
......@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-525 Drop support for partitions, select query lineage, roles, principals, resource, hive_type...(sumasai via shwethags)
ATLAS-599 HDFS Path Model (sumasai via yhemanth)
ATLAS-553 Entity mutation - Fix issue with reordering of elements in array<class> with composite references (sumasai via shwethags)
ATLAS-513 Admin support for HA (yhemanth via sumasai)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment