diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index c1ab384..8fced05 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasConstants; import org.apache.atlas.falcon.model.FalconDataModelGenerator; import org.apache.atlas.falcon.model.FalconDataTypes; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; @@ -262,7 +263,7 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { private Referenceable createHiveDatabaseInstance(String clusterName, String dbName) throws Exception { Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); - dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName); + dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); dbRef.set(HiveDataModelGenerator.NAME, dbName); dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 0680e65..50a5311 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -22,6 +22,7 @@ import com.google.common.base.Joiner; import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; @@ -29,7 +30,6 @@ import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.configuration.Configuration; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.security.UserGroupInformation; import org.codehaus.jettison.json.JSONArray; @@ -163,7 +162,7 @@ public class HiveMetaStoreBridge { String dbName = hiveDB.getName().toLowerCase(); dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName)); dbRef.set(HiveDataModelGenerator.NAME, dbName); - dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName); + dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription()); dbRef.set("locationUri", hiveDB.getLocationUri()); dbRef.set(HiveDataModelGenerator.PARAMETERS, hiveDB.getParameters()); @@ -209,7 +208,7 @@ public class HiveMetaStoreBridge { static String getDatabaseDSLQuery(String clusterName, String databaseName, String typeName) { return String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME, - databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, clusterName); + databaseName.toLowerCase(), AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); } private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception { @@ -251,10 +250,6 @@ public class HiveMetaStoreBridge { for (String tableName : hiveTables) { Table table = hiveClient.getTable(databaseName, tableName); Referenceable tableReferenceable = registerTable(databaseReferenceable, table); - - // Import Partitions - Referenceable sdReferenceable = getSDForTable(databaseName, tableName); - registerPartitions(tableReferenceable, sdReferenceable, table); } } @@ -387,35 +382,6 @@ public class HiveMetaStoreBridge { return new Referenceable(guid, typeName, null); } - private Referenceable getPartitionReference(String dbName, String tableName, List<String> values) throws Exception { - String valuesStr = joinPartitionValues(values); - LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr); - - //todo replace gremlin with DSL - // String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', " - // + "dbName where name = '%s' and clusterName = '%s' select p", typeName, valuesStr, - // tableName, - // dbName, clusterName); - - String tableEntityName = getTableQualifiedName(clusterName, dbName, tableName); - - String gremlinQuery = getPartitionGremlinQuery(valuesStr, tableEntityName); - - return getEntityReferenceFromGremlin(HiveDataTypes.HIVE_PARTITION.getName(), gremlinQuery); - } - - static String joinPartitionValues(List<String> values) { - return "['" + StringUtils.join(values, "', '") + "']"; - } - - static String getPartitionGremlinQuery(String valuesStr, String tableEntityName) { - String typeName = HiveDataTypes.HIVE_PARTITION.getName(); - String datasetType = AtlasClient.DATA_SET_SUPER_TYPE; - return String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')." - + "out('__%s.table').has('%s.name', '%s').back('p').toList()", typeName, typeName, valuesStr, - typeName, datasetType, tableEntityName); - } - private Referenceable getSDForTable(String dbName, String tableName) throws Exception { Referenceable tableRef = getTableReference(dbName, tableName); if (tableRef == null) { @@ -428,85 +394,6 @@ public class HiveMetaStoreBridge { return new Referenceable(sd.getId().id, sd.getTypeName(), null); } - private void registerPartitions(Referenceable tableReferenceable, Referenceable sdReferenceable, - Table table) throws Exception { - String dbName = table.getDbName(); - String tableName = table.getTableName(); - LOG.info("Registering partitions for {}.{}", dbName, tableName); - List<Partition> tableParts = hiveClient.getPartitions(table); - - for (Partition hivePart : tableParts) { - if (hivePart.getValues() != null && hivePart.getValues().size() > 0) { - registerPartition(tableReferenceable, sdReferenceable, hivePart); - } else { - LOG.info("Skipping partition for table {} since partition values are {}", getTableQualifiedName(clusterName, table.getDbName(), table.getTableName()), StringUtils.join(hivePart.getValues(), ",")); - } - } - } - - private Referenceable registerPartition(Referenceable tableReferenceable, Referenceable sdReferenceable, - Partition hivePart) throws Exception { - LOG.info("Registering partition for {} with values {}", tableReferenceable, - StringUtils.join(hivePart.getValues(), ",")); - String dbName = hivePart.getTable().getDbName(); - String tableName = hivePart.getTable().getTableName(); - - Referenceable partRef = getPartitionReference(dbName, tableName, hivePart.getValues()); - if (partRef == null) { - partRef = createPartitionReferenceable(tableReferenceable, sdReferenceable, hivePart); - partRef = registerInstance(partRef); - } else { - LOG.info("Partition {}.{} with values {} is already registered with id {}. Updating entity", - dbName, tableName, - StringUtils.join(hivePart.getValues(), ","), partRef.getId().id); - partRef = - createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, partRef); - updateInstance(partRef); - } - return partRef; - } - - private Referenceable createOrUpdatePartitionReferenceable(Referenceable tableReferenceable, - Referenceable sdReferenceable, - Partition hivePart, Referenceable partRef) { - if (partRef == null) { - partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName()); - } - partRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getPartitionQualifiedName(hivePart)); - partRef.set("values", hivePart.getValues()); - - partRef.set(HiveDataModelGenerator.TABLE, tableReferenceable); - - //todo fix - partRef.set("createTime", hivePart.getLastAccessTime()); - partRef.set(LAST_ACCESS_TIME_ATTR, hivePart.getLastAccessTime()); - - // sdStruct = fillStorageDescStruct(hivePart.getSd()); - // Instead of creating copies of the sdstruct for partitions we are reusing existing - // ones will fix to identify partitions with differing schema. - partRef.set("sd", sdReferenceable); - - partRef.set(HiveDataModelGenerator.PARAMETERS, hivePart.getParameters()); - return partRef; - } - - /** - * Create a Hive partition instance in Atlas - * @param tableReferenceable The Hive Table {@link Referenceable} to which this partition belongs. - * @param sdReferenceable The Storage descriptor {@link Referenceable} for this table. - * @param hivePart The Hive {@link Partition} object being created - * @return Newly created Hive partition instance - */ - public Referenceable createPartitionReferenceable(Referenceable tableReferenceable, Referenceable sdReferenceable, - Partition hivePart) { - return createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, null); - } - - private String getPartitionQualifiedName(Partition partition) { - return String.format("%s.%s.%s@%s", partition.getTable().getDbName(), - partition.getTable().getTableName(), StringUtils.join(partition.getValues(), "-"), clusterName); - } - public Referenceable fillStorageDescStruct(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName) throws Exception { LOG.debug("Filling storage descriptor information for " + storageDesc); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index f313f2e..68e32ff 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -48,6 +48,8 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -262,7 +264,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { assert event.outputs != null && event.outputs.size() > 0; for (WriteEntity writeEntity : event.outputs) { - //Below check should filter out partition related + //Below check should filter out partition related ddls if (writeEntity.getType() == Entity.Type.TABLE) { //Create/update table entity createOrUpdateEntities(dgiBridge, event.user, writeEntity); @@ -313,20 +315,20 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { List<Referenceable> entities = new ArrayList<>(); switch (entity.getType()) { - case DATABASE: - db = entity.getDatabase(); - break; - - case TABLE: - table = entity.getTable(); - db = dgiBridge.hiveClient.getDatabase(table.getDbName()); - break; - - case PARTITION: - partition = entity.getPartition(); - table = partition.getTable(); - db = dgiBridge.hiveClient.getDatabase(table.getDbName()); - break; + case DATABASE: + db = entity.getDatabase(); + break; + + case TABLE: + table = entity.getTable(); + db = dgiBridge.hiveClient.getDatabase(table.getDbName()); + break; + + case PARTITION: + partition = entity.getPartition(); + table = partition.getTable(); + db = dgiBridge.hiveClient.getDatabase(table.getDbName()); + break; } db = dgiBridge.hiveClient.getDatabase(db.getName()); @@ -340,12 +342,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { entities.add(tableEntity); } - if (partition != null) { - Referenceable partitionEntity = dgiBridge.createPartitionReferenceable(tableEntity, - (Referenceable) tableEntity.get("sd"), partition); - entities.add(partitionEntity); - } - messages.add(new HookNotification.EntityUpdateRequest(user, entities)); return tableEntity; } @@ -372,50 +368,70 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { //Even explain CTAS has operation name as CREATETABLE_AS_SELECT if (inputs.isEmpty() && outputs.isEmpty()) { LOG.info("Explain statement. Skipping..."); + return; } if (event.queryId == null) { - LOG.info("Query plan is missing. Skipping..."); + LOG.info("Query id/plan is missing for {}" , event.queryStr); } String queryStr = normalize(event.queryStr); - LOG.debug("Registering query: {}", queryStr); - Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); - processReferenceable.set("name", queryStr); - processReferenceable.set("operationType", event.operation.getOperationName()); - processReferenceable.set("startTime", event.queryStartTime); - processReferenceable.set("userName", event.user); - - List<Referenceable> source = new ArrayList<>(); - for (ReadEntity readEntity : inputs) { - if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) { - Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity); - source.add(inTable); + Map<String, Referenceable> source = new LinkedHashMap<>(); + Map<String, Referenceable> target = new LinkedHashMap<>(); + + boolean isSelectQuery = isSelectQuery(event); + + // Also filter out select queries which do not modify data + if (!isSelectQuery) { + for (ReadEntity readEntity : inputs) { + if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) { + final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),readEntity.getTable().getDbName(), readEntity.getTable().getTableName()); + if (!source.containsKey(tblQFName)) { + Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity); + source.put(tblQFName, inTable); + } + } } - } - processReferenceable.set("inputs", source); - List<Referenceable> target = new ArrayList<>(); - for (WriteEntity writeEntity : outputs) { - if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) { - Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity); - target.add(outTable); + for (WriteEntity writeEntity : outputs) { + if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) { + Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity); + final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), writeEntity.getTable().getDbName(), writeEntity.getTable().getTableName()); + if (!target.containsKey(tblQFName)) { + target.put(tblQFName, outTable); + } + } + } + + if (source.size() > 0 || target.size() > 0) { + Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); + + List<Referenceable> sourceList = new ArrayList<>(source.values()); + List<Referenceable> targetList = new ArrayList<>(target.values()); + + //The serialization code expected a list + processReferenceable.set("inputs", sourceList); + processReferenceable.set("outputs", targetList); + processReferenceable.set("name", queryStr); + processReferenceable.set("operationType", event.operation.getOperationName()); + processReferenceable.set("startTime", event.queryStartTime); + processReferenceable.set("userName", event.user); + processReferenceable.set("queryText", queryStr); + processReferenceable.set("queryId", event.queryId); + processReferenceable.set("queryPlan", event.jsonPlan.toString()); + processReferenceable.set("endTime", System.currentTimeMillis()); + //TODO set queryGraph + messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable)); + } else { + LOG.info("Skipped query {} since it has no inputs or resulting outputs", queryStr); } + } else { + LOG.info("Skipped query {} for processing since it is a select query ", queryStr); } - processReferenceable.set("outputs", target); - processReferenceable.set("queryText", queryStr); - processReferenceable.set("queryId", event.queryId); - processReferenceable.set("queryPlan", event.jsonPlan.toString()); - processReferenceable.set("endTime", System.currentTimeMillis()); - - //TODO set - processReferenceable.set("queryGraph", "queryGraph"); - messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable)); } - private JSONObject getQueryPlan(HiveConf hiveConf, QueryPlan queryPlan) throws Exception { try { ExplainTask explain = new ExplainTask(); @@ -427,4 +443,26 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return new JSONObject(); } } + + private boolean isSelectQuery(HiveEvent event) { + if (event.operation == HiveOperation.QUERY) { + Set<WriteEntity> outputs = event.outputs; + + //Select query has only one output + if (outputs.size() == 1) { + WriteEntity output = outputs.iterator().next(); + /* Strangely select queries have DFS_DIR as the type which seems like a bug in hive. Filter out by checking if the path is a temporary URI + * Insert into/overwrite queries onto local or dfs paths have DFS_DIR or LOCAL_DIR as the type and WriteType.PATH_WRITE and tempUri = false + * Insert into a temporary table has isTempURI = false. So will not skip as expected + */ + if (output.getType() == Type.DFS_DIR || output.getType() == Type.LOCAL_DIR) { + if (output.getWriteType() == WriteEntity.WriteType.PATH_WRITE && + output.isTempURI()) { + return true; + } + } + } + } + return false; + } } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java index 2ca5953..ebeabb6 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasException; import org.apache.atlas.addons.ModelDefinitionDump; import org.apache.atlas.typesystem.TypesDef; @@ -69,7 +70,6 @@ public class HiveDataModelGenerator { public static final String NAME = "name"; public static final String TABLE_NAME = "tableName"; - public static final String CLUSTER_NAME = "clusterName"; public static final String TABLE = "table"; public static final String DB = "db"; @@ -88,24 +88,16 @@ public class HiveDataModelGenerator { LOG.info("Generating the Hive Data Model...."); // enums - createHiveObjectTypeEnum(); createHivePrincipalTypeEnum(); - createResourceTypeEnum(); - // structs createSerDeStruct(); - //createSkewedInfoStruct(); createOrderStruct(); - createResourceUriStruct(); createStorageDescClass(); // classes createDBClass(); - createTypeClass(); createColumnClass(); - createPartitionClass(); createTableClass(); - createRoleClass(); // DDL/DML Process createProcessClass(); @@ -136,15 +128,6 @@ public class HiveDataModelGenerator { return ImmutableList.of(); } - private void createHiveObjectTypeEnum() throws AtlasException { - EnumValue values[] = {new EnumValue("GLOBAL", 1), new EnumValue("DATABASE", 2), new EnumValue("TABLE", 3), - new EnumValue("PARTITION", 4), new EnumValue("COLUMN", 5),}; - - EnumTypeDefinition definition = new EnumTypeDefinition(HiveDataTypes.HIVE_OBJECT_TYPE.getName(), values); - enumTypeDefinitionMap.put(HiveDataTypes.HIVE_OBJECT_TYPE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_OBJECT_TYPE.getName()); - } - private void createHivePrincipalTypeEnum() throws AtlasException { EnumValue values[] = {new EnumValue("USER", 1), new EnumValue("ROLE", 2), new EnumValue("GROUP", 3),}; @@ -154,13 +137,6 @@ public class HiveDataModelGenerator { LOG.debug("Created definition for " + HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName()); } - private void createResourceTypeEnum() throws AtlasException { - EnumValue values[] = {new EnumValue("JAR", 1), new EnumValue("FILE", 2), new EnumValue("ARCHIVE", 3),}; - EnumTypeDefinition definition = new EnumTypeDefinition(HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), values); - enumTypeDefinitionMap.put(HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_RESOURCE_TYPE.getName()); - } - private void createSerDeStruct() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), @@ -217,21 +193,10 @@ public class HiveDataModelGenerator { /** Revisit later after nested array types are handled by the typesystem **/ - private void createResourceUriStruct() throws AtlasException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("resourceType", HiveDataTypes.HIVE_RESOURCE_TYPE.getName(), - Multiplicity.REQUIRED, false, null), - new AttributeDefinition("uri", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),}; - StructTypeDefinition definition = - new StructTypeDefinition(HiveDataTypes.HIVE_RESOURCEURI.getName(), attributeDefinitions); - structTypeDefinitionMap.put(HiveDataTypes.HIVE_RESOURCEURI.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_RESOURCEURI.getName()); - } - private void createDBClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), - new AttributeDefinition(CLUSTER_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + new AttributeDefinition(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), @@ -250,21 +215,6 @@ public class HiveDataModelGenerator { LOG.debug("Created definition for " + HiveDataTypes.HIVE_DB.getName()); } - private void createTypeClass() throws AtlasException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), - new AttributeDefinition("type1", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("type2", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("fields", String.format("array<%s>", HiveDataTypes.HIVE_COLUMN.getName()), - Multiplicity.OPTIONAL, false, null),}; - HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TYPE.getName(), null, null, - attributeDefinitions); - - classTypeDefinitions.put(HiveDataTypes.HIVE_TYPE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_TYPE.getName()); - } - private void createColumnClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), @@ -277,27 +227,6 @@ public class HiveDataModelGenerator { LOG.debug("Created definition for " + HiveDataTypes.HIVE_COLUMN.getName()); } - private void createPartitionClass() throws AtlasException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition(TABLE, HiveDataTypes.HIVE_TABLE.getName(), Multiplicity.REQUIRED, false, null), - new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), Multiplicity.OPTIONAL, false, - null), - new AttributeDefinition("lastAccessTime", DataTypes.LONG_TYPE.getName(), Multiplicity.OPTIONAL, false, - null), - new AttributeDefinition(STORAGE_DESC, HiveDataTypes.HIVE_STORAGEDESC.getName(), Multiplicity.REQUIRED, true, - null), - new AttributeDefinition("columns", DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), - Multiplicity.OPTIONAL, true, null), - new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),}; - HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_PARTITION.getName(), null, - ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), attributeDefinitions); - classTypeDefinitions.put(HiveDataTypes.HIVE_PARTITION.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_PARTITION.getName()); - } - private void createTableClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ new AttributeDefinition(TABLE_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, @@ -332,22 +261,6 @@ public class HiveDataModelGenerator { LOG.debug("Created definition for " + HiveDataTypes.HIVE_TABLE.getName()); } - private void createRoleClass() throws AtlasException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition("roleName", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, - null), - new AttributeDefinition("createTime", DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false, - null), - new AttributeDefinition(OWNER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, - null),}; - HierarchicalTypeDefinition<ClassType> definition = - new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_ROLE.getName(), null, null, - attributeDefinitions); - - classTypeDefinitions.put(HiveDataTypes.HIVE_ROLE.getName(), definition); - LOG.debug("Created definition for " + HiveDataTypes.HIVE_ROLE.getName()); - } - private void createProcessClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ new AttributeDefinition("startTime", DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false, null), diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java index c717c0f..0ae69dd 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java @@ -129,42 +129,6 @@ public class HiveMetaStoreBridgeTest { } @Test - public void testImportThatUpdatesRegisteredPartition() throws Exception { - setupDB(hiveClient, TEST_DB_NAME); - Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); - - returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); - - when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, - TEST_TABLE_NAME, - HiveDataTypes.HIVE_TABLE.getName()))).thenReturn( - getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); - when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); - - Partition partition = mock(Partition.class); - when(partition.getTable()).thenReturn(hiveTable); - List partitionValues = Arrays.asList(new String[]{"name", "location"}); - when(partition.getValues()).thenReturn(partitionValues); - int lastAccessTime = 1234512345; - when(partition.getLastAccessTime()).thenReturn(lastAccessTime); - - when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new Partition[]{partition})); - - when(atlasClient.searchByGremlin( - HiveMetaStoreBridge.getPartitionGremlinQuery( - HiveMetaStoreBridge.joinPartitionValues(partitionValues), - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))). - thenReturn(getPartitionReference("9ae06b34-9151-3043-aa9d-b82103a50e99")); - - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); - bridge.importHiveMetadata(); - - verify(atlasClient).updateEntity(eq("9ae06b34-9151-3043-aa9d-b82103a50e99"), - (Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.LAST_ACCESS_TIME_ATTR, - new Integer(lastAccessTime)))); - } - - @Test public void testImportWhenPartitionKeysAreNull() throws Exception { setupDB(hiveClient, TEST_DB_NAME); Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); @@ -192,14 +156,6 @@ public class HiveMetaStoreBridgeTest { } } - private JSONArray getPartitionReference(String id) throws JSONException { - JSONObject resultEntry = new JSONObject(); - resultEntry.put(HiveMetaStoreBridge.SEARCH_ENTRY_GUID_ATTR, id); - JSONArray results = new JSONArray(); - results.put(resultEntry); - return results; - } - private JSONArray getEntityReference(String id) throws JSONException { return new JSONArray(String.format("[{\"$id$\":{\"id\":\"%s\"}}]", id)); } diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index baa427b..f7290ee 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -62,6 +62,11 @@ public class HiveHookIT { private AtlasClient dgiCLient; private SessionState ss; + private enum QUERY_TYPE { + GREMLIN, + DSL + } + @BeforeClass public void setUp() throws Exception { //Set-up hive session @@ -130,12 +135,19 @@ public class HiveHookIT { } private String createTable() throws Exception { - return createTable(true); + return createTable(false); + } + + private String createTable(boolean isPartitioned) throws Exception { + String tableName = tableName(); + runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? + " partitioned by(dt string)" : "")); + return tableName; } - private String createTable(boolean partition) throws Exception { + private String createTable(boolean isPartitioned, boolean isTemporary) throws Exception { String tableName = tableName(); - runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (partition ? + runCommand("create " + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? " partitioned by(dt string)" : "")); return tableName; } @@ -181,7 +193,7 @@ public class HiveHookIT { LOG.debug("Searching for column {}", colName); String query = String.format("%s where qualifiedName = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase()); - assertEntityIsNotRegistered(query); + assertEntityIsNotRegistered(QUERY_TYPE.DSL, query); } @Test @@ -266,19 +278,99 @@ public class HiveHookIT { } @Test - public void testInsert() throws Exception { + public void testLoadDataIntoPartition() throws Exception { + String tableName = createTable(true); + + String loadFile = file("load"); + String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')"; + runCommand(query); + + String processId = assertProcessIsRegistered(query); + Referenceable process = dgiCLient.getEntity(processId); + Assert.assertNull(process.get("inputs")); + + System.out.println(" Ref Ops : " + process.get("outputs")); + Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1); + } + + @Test + public void testInsertIntoTable() throws Exception { String tableName = createTable(); String insertTableName = createTable(); String query = - "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName - + " where dt = '2015-01-01'"; + "insert into " + insertTableName + " select id, name from " + tableName; runCommand(query); - assertProcessIsRegistered(query); - String partId = assertPartitionIsRegistered(DEFAULT_DB, insertTableName, "2015-01-01"); - Referenceable partitionEntity = dgiCLient.getEntity(partId); - Assert.assertEquals(partitionEntity.get("qualifiedName"), - String.format("%s.%s.%s@%s", "default", insertTableName.toLowerCase(), "2015-01-01", CLUSTER_NAME)); + String processId = assertProcessIsRegistered(query); + Referenceable process = dgiCLient.getEntity(processId); + Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); + Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1); + + assertTableIsRegistered(DEFAULT_DB, tableName); + assertTableIsRegistered(DEFAULT_DB, insertTableName); + } + + @Test + public void testInsertIntoLocalDir() throws Exception { + String tableName = createTable(); + File randomLocalPath = File.createTempFile("hiverandom", ".tmp"); + String query = + "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName; + + runCommand(query); + String processId = assertProcessIsRegistered(query); + Referenceable process = dgiCLient.getEntity(processId); + Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); + Assert.assertNull(process.get("outputs")); + + assertTableIsRegistered(DEFAULT_DB, tableName); + } + + @Test + public void testInsertIntoDFSDir() throws Exception { + String tableName = createTable(); + String pFile = "pfile://" + mkdir("somedfspath"); + String query = + "insert overwrite DIRECTORY '" + pFile + "' select id, name from " + tableName; + + runCommand(query); + String processId = assertProcessIsRegistered(query); + Referenceable process = dgiCLient.getEntity(processId); + Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); + Assert.assertNull(process.get("outputs")); + + assertTableIsRegistered(DEFAULT_DB, tableName); + } + + @Test + public void testInsertIntoTempTable() throws Exception { + String tableName = createTable(); + String insertTableName = createTable(false, true); + String query = + "insert into " + insertTableName + " select id, name from " + tableName; + + runCommand(query); + String processId = assertProcessIsRegistered(query); + Referenceable process = dgiCLient.getEntity(processId); + Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); + Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1); + + assertTableIsRegistered(DEFAULT_DB, tableName); + assertTableIsRegistered(DEFAULT_DB, insertTableName); + } + + @Test + public void testInsertIntoPartition() throws Exception { + String tableName = createTable(true); + String insertTableName = createTable(true); + String query = + "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName + + " where dt = '2015-01-01'"; + runCommand(query); + String processId = assertProcessIsRegistered(query); + Referenceable process = dgiCLient.getEntity(processId); + Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); + Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1); } private String random() { @@ -316,18 +408,16 @@ public class HiveHookIT { } @Test - public void testSelect() throws Exception { + public void testIgnoreSelect() throws Exception { String tableName = createTable(); String query = "select * from " + tableName; runCommand(query); - String pid = assertProcessIsRegistered(query); - Referenceable processEntity = dgiCLient.getEntity(pid); - Assert.assertEquals(processEntity.get("name"), query.toLowerCase()); + assertProcessIsNotRegistered(query); - //single entity per query + //check with uppercase table name query = "SELECT * from " + tableName.toUpperCase(); runCommand(query); - assertProcessIsRegistered(query); + assertProcessIsNotRegistered(query); } @Test @@ -727,6 +817,18 @@ public class HiveHookIT { return assertEntityIsRegistered(gremlinQuery); } + private void assertProcessIsNotRegistered(String queryStr) throws Exception { + // String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), + // normalize(queryStr)); + // assertEntityIsRegistered(dslQuery, true); + //todo replace with DSL + String typeName = HiveDataTypes.HIVE_PROCESS.getName(); + String gremlinQuery = + String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", typeName, typeName, + normalize(queryStr)); + assertEntityIsNotRegistered(QUERY_TYPE.GREMLIN, gremlinQuery); + } + private String normalize(String str) { if (StringUtils.isEmpty(str)) { return null; @@ -739,7 +841,7 @@ public class HiveHookIT { String query = String.format( "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t", HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); - assertEntityIsNotRegistered(query); + assertEntityIsNotRegistered(QUERY_TYPE.DSL, query); } private String assertTableIsRegistered(String dbName, String tableName) throws Exception { @@ -765,17 +867,6 @@ public class HiveHookIT { return assertEntityIsRegistered(query); } - private String assertPartitionIsRegistered(String dbName, String tableName, String value) throws Exception { - String typeName = HiveDataTypes.HIVE_PARTITION.getName(); - - LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value); - String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', " - + "db where name = '%s' and clusterName = '%s' select p", typeName, value, - tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); - - return assertEntityIsRegistered(dslQuery, "p"); - } - private String assertEntityIsRegistered(final String query, String... arg) throws Exception { waitFor(60000, new Predicate() { @Override @@ -798,8 +889,16 @@ public class HiveHookIT { } } - private void assertEntityIsNotRegistered(String dslQuery) throws Exception { - JSONArray results = dgiCLient.searchByDSL(dslQuery); + private void assertEntityIsNotRegistered(QUERY_TYPE queryType, String query) throws Exception { + JSONArray results = null; + switch(queryType) { + case DSL : + results = dgiCLient.searchByDSL(query); + break; + case GREMLIN : + results = dgiCLient.searchByGremlin(query); + break; + } Assert.assertEquals(results.length(), 0); } diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index 924e467..ab7e6ee 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -21,6 +21,7 @@ package org.apache.atlas.sqoop.hook; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasConstants; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; @@ -61,7 +62,7 @@ public class SqoopHook extends SqoopJobDataPublisher { public Referenceable createHiveDatabaseInstance(String clusterName, String dbName) throws Exception { Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); - dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName); + dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); dbRef.set(HiveDataModelGenerator.NAME, dbName); dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 15a42dc..5665856 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -215,7 +215,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ? config.get("HdfsBolt.fileNameFormat.path") : config.get("HdfsBolt.rotationActions"); final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri; - dataSetReferenceable.set(HiveDataModelGenerator.CLUSTER_NAME, getClusterName(stormConf)); + dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr); dataSetReferenceable.set("path", hdfsPathStr); dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal")); @@ -230,7 +230,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { dbReferenceable.set(HiveDataModelGenerator.NAME, databaseName); dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName)); - dbReferenceable.set(HiveDataModelGenerator.CLUSTER_NAME, getClusterName(stormConf)); + dbReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); dependentEntities.add(dbReferenceable); clusterName = extractComponentClusterName(new HiveConf(), stormConf); final String hiveTableName = config.get("HiveBolt.options.tableName"); diff --git a/release-log.txt b/release-log.txt index 1519989..231f0ce 100644 --- a/release-log.txt +++ b/release-log.txt @@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-525 Drop support for partitions, select query lineage, roles, principals, resource, hive_type...(sumasai via shwethags) ATLAS-599 HDFS Path Model (sumasai via yhemanth) ATLAS-553 Entity mutation - Fix issue with reordering of elements in array<class> with composite references (sumasai via shwethags) ATLAS-513 Admin support for HA (yhemanth via sumasai)