Commit d838cf38 by Shwetha GS

ATLAS-819 All user defined types should have a set of common attributes (shwethags)

parent f4670dd3
......@@ -21,9 +21,7 @@ package org.apache.atlas.falcon.Util;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
......@@ -45,20 +43,19 @@ public final class EventUtil {
String[] tags = keyValueString.split(",");
for (String tag : tags) {
int index = tag.indexOf("=");
String tagKey = tag.substring(0, index);
String tagValue = tag.substring(index + 1, tag.length());
String tagKey = tag.substring(0, index).trim();
String tagValue = tag.substring(index + 1, tag.length()).trim();
keyValueMap.put(tagKey, tagValue);
}
return keyValueMap;
}
public static UserGroupInformation getUgi() throws FalconException {
UserGroupInformation ugi;
public static String getUser() throws FalconException {
try {
ugi = CurrentUser.getAuthenticatedUGI();
} catch (IOException ioe) {
throw new FalconException(ioe);
return CurrentUser.getAuthenticatedUGI().getShortUserName();
} catch (Exception ioe) {
//Ignore is failed to get user, uses login user
}
return ugi;
return null;
}
}
......@@ -19,7 +19,6 @@
package org.apache.atlas.falcon.event;
import org.apache.falcon.entity.v0.Entity;
import org.apache.hadoop.security.UserGroupInformation;
import java.util.Date;
......@@ -28,14 +27,12 @@ import java.util.Date;
*/
public class FalconEvent {
protected String user;
protected UserGroupInformation ugi;
protected OPERATION operation;
protected long timestamp;
protected Entity entity;
public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOperation, long timestamp, Entity entity) {
public FalconEvent(String doAsUser, OPERATION falconOperation, long timestamp, Entity entity) {
this.user = doAsUser;
this.ugi = ugi;
this.operation = falconOperation;
this.timestamp = timestamp;
this.entity = entity;
......@@ -54,10 +51,6 @@ public class FalconEvent {
return user;
}
public UserGroupInformation getUgi() {
return ugi;
}
public OPERATION getOperation() {
return operation;
}
......
......@@ -22,13 +22,13 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.falcon.event.FalconEvent;
import org.apache.atlas.falcon.publisher.FalconEventPublisher;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.falcon.event.FalconEvent;
import org.apache.atlas.falcon.publisher.FalconEventPublisher;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
......@@ -151,7 +151,7 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
Operation op = getOperation(event.getOperation());
String user = getUser(event.getUser());
LOG.info("fireAndForget user:{}, ugi: {}", user, event.getUgi());
LOG.info("fireAndForget user:{}", user);
switch (op) {
case ADD:
messages.add(new HookNotification.EntityCreateRequest(user, createEntities(event, user)));
......@@ -167,18 +167,15 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
switch (event.getOperation()) {
case ADD_CLUSTER:
entities.add(FalconBridge
.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity(), user,
event.getTimestamp()));
.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity()));
break;
case ADD_PROCESS:
entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE,
user, event.getTimestamp()));
entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE));
break;
case ADD_FEED:
entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), STORE,
user, event.getTimestamp()));
entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), STORE));
break;
case UPDATE_CLUSTER:
......
......@@ -50,13 +50,11 @@ import java.util.Map;
public class FalconDataModelGenerator {
private static final Logger LOG = LoggerFactory.getLogger(FalconDataModelGenerator.class);
public static final String FREQUENCY = "frequency";
private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions;
public static final String NAME = "name";
public static final String TIMESTAMP = "timestamp";
public static final String COLO = "colo";
public static final String USER = "owner";
public static final String TAGS = "tags";
public static final String GROUPS = "groups";
public static final String PIPELINES = "pipelines";
......@@ -64,10 +62,6 @@ public class FalconDataModelGenerator {
public static final String RUNSON = "runs-on";
public static final String STOREDIN = "stored-in";
// multiple inputs and outputs for process
public static final String INPUTS = "inputs";
public static final String OUTPUTS = "outputs";
public FalconDataModelGenerator() {
classTypeDefinitions = new HashMap<>();
}
......@@ -78,8 +72,8 @@ public class FalconDataModelGenerator {
// classes
createClusterEntityClass();
createProcessEntityClass();
createFeedCreationEntityClass();
createFeedEntityClass();
createFeedDatasetClass();
createReplicationFeedEntityClass();
}
......@@ -102,12 +96,8 @@ public class FalconDataModelGenerator {
private void createClusterEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition(COLO, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
// map of tags
new AttributeDefinition(TAGS,
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
......@@ -120,14 +110,11 @@ public class FalconDataModelGenerator {
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_CLUSTER.getName());
}
private void createFeedEntityClass() throws AtlasException {
private void createFeedCreationEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED,
false, null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null)};
false, null)
};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED_CREATION.getName(), null,
......@@ -136,19 +123,17 @@ public class FalconDataModelGenerator {
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_CREATION.getName());
}
private void createFeedDatasetClass() throws AtlasException {
private void createFeedEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
TypesUtil.createRequiredAttrDef(FREQUENCY, DataTypes.STRING_TYPE),
new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED,
false, null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition(GROUPS, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
// map of tags
new AttributeDefinition(TAGS,
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
Multiplicity.OPTIONAL, false, null),};
Multiplicity.OPTIONAL, false, null)
};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED.getName(), null,
......@@ -159,28 +144,19 @@ public class FalconDataModelGenerator {
private void createReplicationFeedEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null)};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class,
FalconDataTypes.FALCON_FEED_REPLICATION.getName(), null,
ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), null);
classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), definition);
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_REPLICATION.getName());
}
private void createProcessEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
TypesUtil.createRequiredAttrDef(FREQUENCY, DataTypes.STRING_TYPE),
new AttributeDefinition(RUNSON, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED,
false, null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
// map of tags
new AttributeDefinition(TAGS,
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
......
......@@ -28,7 +28,6 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -127,15 +126,12 @@ public class AtlasService implements FalconService, ConfigurationChangeListener
LOG.info("Adding {} entity to Atlas: {}", entity.getEntityType().name(), entity.getName());
try {
String user = entity.getACL() != null ? entity.getACL().getOwner() :
UserGroupInformation.getLoginUser().getShortUserName();
FalconEvent event =
new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity);
new FalconEvent(EventUtil.getUser(), operation, System.currentTimeMillis(), entity);
FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
publisher.publish(data);
} catch (Exception ex) {
throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex);
}
}
}
......@@ -181,6 +181,7 @@ public class FalconHookIT {
feedCluster.setName(clusterName);
STORE.publish(EntityType.FEED, feed);
String feedId = assertFeedIsRegistered(feed, clusterName);
assertFeedAttributes(feedId);
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
......@@ -223,15 +224,24 @@ public class FalconHookIT {
STORE.publish(EntityType.FEED, feed);
String feedId = assertFeedIsRegistered(feed, clusterName);
assertFeedAttributes(feedId);
verifyFeedLineage(feed.getName(), clusterName, feedId, dbName, tableName);
if (secondClusterName != null) {
String feedId2 = assertFeedIsRegistered(feed, secondClusterName);
assertFeedAttributes(feedId2);
verifyFeedLineage(feed.getName(), secondClusterName, feedId2, dbName2, tableName2);
}
return feed;
}
private void assertFeedAttributes(String feedId) throws Exception {
Referenceable feedEntity = atlasClient.getEntity(feedId);
assertEquals(feedEntity.get(AtlasClient.OWNER), "testuser");
assertEquals(feedEntity.get(FalconDataModelGenerator.FREQUENCY), "hours(1)");
assertEquals(feedEntity.get(AtlasClient.DESCRIPTION), "test input");
}
private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName)
throws Exception{
//verify that lineage from hive table to falcon feed is created
......
......@@ -37,7 +37,7 @@ object FSDataModel extends App {
val typesDef : TypesDef = types {
// FS DataSet
_class(FSDataTypes.FS_PATH.toString) {
_class(FSDataTypes.FS_PATH.toString, List(AtlasClient.DATA_SET_SUPER_TYPE)) {
//fully qualified path/URI to the filesystem path is stored in 'qualifiedName' and 'path'.
"path" ~ (string, required, indexed)
"createTime" ~ (date, optional, indexed)
......@@ -48,7 +48,6 @@ object FSDataModel extends App {
"isSymlink" ~ (boolean, optional, indexed)
//Optional and may not be set for a directory
"fileSize" ~ (long, optional, indexed)
"owner" ~ (string, optional, indexed)
"group" ~ (string, optional, indexed)
"posixPermissions" ~ (FSDataTypes.FS_PERMISSIONS.toString, optional, indexed)
}
......@@ -63,7 +62,7 @@ object FSDataModel extends App {
}
//HDFS DataSet
_class(FSDataTypes.HDFS_PATH.toString, List("DataSet", FSDataTypes.FS_PATH.toString)) {
_class(FSDataTypes.HDFS_PATH.toString, List(FSDataTypes.FS_PATH.toString)) {
//Making cluster optional since path is already unique containing the namenode URI
AtlasConstants.CLUSTER_NAME_ATTRIBUTE ~ (string, optional, indexed)
"numberOfReplicas" ~ (int, optional, indexed)
......
......@@ -27,7 +27,6 @@ import org.apache.atlas.fs.model.FSDataModel;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization;
......@@ -54,6 +53,7 @@ import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
......@@ -163,12 +163,12 @@ public class HiveMetaStoreBridge {
}
String dbName = hiveDB.getName().toLowerCase();
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName));
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(AtlasClient.NAME, dbName);
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription());
dbRef.set(HiveDataModelGenerator.LOCATION, hiveDB.getLocationUri());
dbRef.set(HiveDataModelGenerator.PARAMETERS, hiveDB.getParameters());
dbRef.set(HiveDataModelGenerator.OWNER, hiveDB.getOwnerName());
dbRef.set(AtlasClient.OWNER, hiveDB.getOwnerName());
if (hiveDB.getOwnerType() != null) {
dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
}
......@@ -209,7 +209,7 @@ public class HiveMetaStoreBridge {
}
static String getDatabaseDSLQuery(String clusterName, String databaseName, String typeName) {
return String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME,
return String.format("%s where %s = '%s' and %s = '%s'", typeName, AtlasClient.NAME,
databaseName.toLowerCase(), AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
}
......@@ -398,8 +398,8 @@ public class HiveMetaStoreBridge {
String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable);
tableReference.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
tableReference.set(HiveDataModelGenerator.NAME, hiveTable.getTableName().toLowerCase());
tableReference.set(HiveDataModelGenerator.OWNER, hiveTable.getOwner());
tableReference.set(AtlasClient.NAME, hiveTable.getTableName().toLowerCase());
tableReference.set(AtlasClient.OWNER, hiveTable.getOwner());
Date createDate = new Date();
if (hiveTable.getTTable() != null){
......@@ -442,10 +442,10 @@ public class HiveMetaStoreBridge {
tableReference.set("temporary", hiveTable.isTemporary());
// add reference to the Partition Keys
List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableQualifiedName, tableReference.getId());
List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableReference);
tableReference.set("partitionKeys", partKeys);
tableReference.set(HiveDataModelGenerator.COLUMNS, getColumns(hiveTable.getCols(), tableQualifiedName, tableReference.getId()));
tableReference.set(HiveDataModelGenerator.COLUMNS, getColumns(hiveTable.getCols(), tableReference));
return tableReference;
}
......@@ -507,7 +507,7 @@ public class HiveMetaStoreBridge {
String serdeInfoName = HiveDataTypes.HIVE_SERDE.getName();
Struct serdeInfoStruct = new Struct(serdeInfoName);
serdeInfoStruct.set(HiveDataModelGenerator.NAME, serdeInfo.getName());
serdeInfoStruct.set(AtlasClient.NAME, serdeInfo.getName());
serdeInfoStruct.set("serializationLib", serdeInfo.getSerializationLib());
serdeInfoStruct.set(HiveDataModelGenerator.PARAMETERS, serdeInfo.getParameters());
......@@ -561,18 +561,19 @@ public class HiveMetaStoreBridge {
return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName);
}
public List<Referenceable> getColumns(List<FieldSchema> schemaList, String tableQualifiedName, Id tableReference) throws Exception {
public List<Referenceable> getColumns(List<FieldSchema> schemaList, Referenceable tableReference) throws Exception {
List<Referenceable> colList = new ArrayList<>();
for (FieldSchema fs : schemaList) {
LOG.debug("Processing field " + fs);
Referenceable colReferenceable = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
colReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getColumnQualifiedName(tableQualifiedName, fs.getName()));
colReferenceable.set(HiveDataModelGenerator.NAME, fs.getName());
getColumnQualifiedName((String) tableReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), fs.getName()));
colReferenceable.set(AtlasClient.NAME, fs.getName());
colReferenceable.set(AtlasClient.OWNER, tableReference.get(AtlasClient.OWNER));
colReferenceable.set("type", fs.getType());
colReferenceable.set(HiveDataModelGenerator.COMMENT, fs.getComment());
colReferenceable.set(HiveDataModelGenerator.TABLE, tableReference);
colReferenceable.set(HiveDataModelGenerator.TABLE, tableReference.getId());
colList.add(colReferenceable);
}
......
......@@ -401,12 +401,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
private Referenceable replaceTableQFName(HiveEventContext event, Table oldTable, Table newTable, final Referenceable tableEntity, final String oldTableQFName, final String newTableQFName) throws HiveException {
tableEntity.set(HiveDataModelGenerator.NAME, oldTable.getTableName().toLowerCase());
tableEntity.set(AtlasClient.NAME, oldTable.getTableName().toLowerCase());
tableEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, oldTableQFName);
//Replace table entity with new name
final Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
newEntity.set(HiveDataModelGenerator.NAME, newTable.getTableName().toLowerCase());
newEntity.set(AtlasClient.NAME, newTable.getTableName().toLowerCase());
newEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newTableQFName);
ArrayList<String> alias_list = new ArrayList<>();
......@@ -422,7 +422,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private List<Referenceable> replaceColumnQFName(final HiveEventContext event, final List<Referenceable> cols, final String oldTableQFName, final String newTableQFName) {
List<Referenceable> newColEntities = new ArrayList<>();
for (Referenceable col : cols) {
final String colName = (String) col.get(HiveDataModelGenerator.NAME);
final String colName = (String) col.get(AtlasClient.NAME);
String oldColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(oldTableQFName, colName);
String newColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newTableQFName, colName);
col.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, oldColumnQFName);
......
......@@ -70,15 +70,12 @@ public class HiveDataModelGenerator {
public static final String STORAGE_NUM_BUCKETS = "numBuckets";
public static final String STORAGE_IS_STORED_AS_SUB_DIRS = "storedAsSubDirectories";
public static final String NAME = "name";
public static final String TABLE_NAME = "tableName";
public static final String TABLE = "table";
public static final String DB = "db";
public static final String STORAGE_DESC = "sd";
public static final String STORAGE_DESC_INPUT_FMT = "inputFormat";
public static final String STORAGE_DESC_OUTPUT_FMT = "outputFormat";
public static final String OWNER = "owner";
public static final String LOCATION = "location";
public static final String TABLE_TYPE_ATTR = "tableType";
......@@ -147,7 +144,7 @@ public class HiveDataModelGenerator {
private void createSerDeStruct() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(AtlasClient.NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("serializationLib", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL,
false, null),
new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),};
......@@ -206,29 +203,23 @@ public class HiveDataModelGenerator {
private void createDBClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition(LOCATION, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(OWNER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("ownerType", HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(), Multiplicity.OPTIONAL,
false, null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_DB.getName(), null,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), attributeDefinitions);
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_DB.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_DB.getName());
}
private void createColumnClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("type", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(COMMENT, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
//Making this optional since this is an incompatible change
......@@ -237,7 +228,7 @@ public class HiveDataModelGenerator {
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_COLUMN.getName(), null,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), attributeDefinitions);
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_COLUMN.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_COLUMN.getName());
}
......@@ -245,7 +236,6 @@ public class HiveDataModelGenerator {
private void createTableClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(DB, HiveDataTypes.HIVE_DB.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(OWNER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(CREATE_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition(LAST_ACCESS_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false,
......@@ -271,7 +261,7 @@ public class HiveDataModelGenerator {
null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TABLE.getName(), null,
ImmutableSet.of("DataSet"), attributeDefinitions);
ImmutableSet.of(AtlasClient.DATA_SET_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(HiveDataTypes.HIVE_TABLE.getName(), definition);
LOG.debug("Created definition for " + HiveDataTypes.HIVE_TABLE.getName());
}
......
......@@ -63,9 +63,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.AtlasClient.NAME;
import static org.apache.atlas.hive.hook.HiveHook.lower;
import static org.apache.atlas.hive.hook.HiveHook.normalize;
import static org.apache.atlas.hive.model.HiveDataModelGenerator.NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
......@@ -192,13 +192,18 @@ public class HiveHookIT {
Assert.assertNotNull(colEntity.get(HiveDataModelGenerator.TABLE));
Assert.assertEquals(((Id) colEntity.get(HiveDataModelGenerator.TABLE))._getId(), tableId);
//assert that column.owner = table.owner
Referenceable tableRef = atlasClient.getEntity(tableId);
assertEquals(tableRef.get(AtlasClient.OWNER), colEntity.get(AtlasClient.OWNER));
//create table where db is not registered
tableName = createTable();
tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
Referenceable tableRef = atlasClient.getEntity(tableId);
tableRef = atlasClient.getEntity(tableId);
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.TABLE_TYPE_ATTR), TableType.MANAGED_TABLE.name());
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment");
String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), tableName.toLowerCase());
Assert.assertEquals(tableRef.get(AtlasClient.NAME), tableName.toLowerCase());
Assert.assertEquals(tableRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), entityName);
Table t = hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, tableName);
......@@ -1351,7 +1356,7 @@ public class HiveHookIT {
assertDatabaseIsRegistered(dbName, new AssertPredicate() {
@Override
public void assertOnEntity(Referenceable entity) {
assertEquals(entity.get(HiveDataModelGenerator.OWNER), owner);
assertEquals(entity.get(AtlasClient.OWNER), owner);
}
});
}
......
......@@ -63,7 +63,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(AtlasClient.NAME, dbName);
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
return dbRef;
......@@ -74,7 +74,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
tableRef.set(HiveDataModelGenerator.NAME, tableName.toLowerCase());
tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
tableRef.set(HiveDataModelGenerator.DB, dbRef);
return tableRef;
}
......@@ -92,14 +92,14 @@ public class SqoopHook extends SqoopJobDataPublisher {
String usage = table != null ? "TABLE" : "QUERY";
String source = table != null ? table : query;
String name = getSqoopDBStoreName(data);
storeRef.set(SqoopDataModelGenerator.NAME, name);
storeRef.set(AtlasClient.NAME, name);
storeRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
storeRef.set(SqoopDataModelGenerator.DB_STORE_TYPE, data.getStoreType());
storeRef.set(SqoopDataModelGenerator.DB_STORE_USAGE, usage);
storeRef.set(SqoopDataModelGenerator.STORE_URI, data.getUrl());
storeRef.set(SqoopDataModelGenerator.SOURCE, source);
storeRef.set(SqoopDataModelGenerator.DESCRIPTION, "");
storeRef.set(SqoopDataModelGenerator.OWNER, data.getUser());
storeRef.set(AtlasClient.OWNER, data.getUser());
return storeRef;
}
......@@ -107,7 +107,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
SqoopJobDataPublisher.Data data, String clusterName) {
Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName());
final String sqoopProcessName = getSqoopProcessName(data, clusterName);
procRef.set(SqoopDataModelGenerator.NAME, sqoopProcessName);
procRef.set(AtlasClient.NAME, sqoopProcessName);
procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
procRef.set(SqoopDataModelGenerator.OPERATION, data.getOperation());
if (isImportOperation(data)) {
......
......@@ -57,8 +57,6 @@ public class SqoopDataModelGenerator {
private static final DataTypes.MapType STRING_MAP_TYPE =
new DataTypes.MapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE);
public static final String NAME = "name";
public static final String OWNER = "ownerName";
public static final String USER = "userName";
public static final String DB_STORE_TYPE = "dbStoreType";
public static final String DB_STORE_USAGE = "storeUse";
......@@ -127,9 +125,8 @@ public class SqoopDataModelGenerator {
new AttributeDefinition(STORE_URI,
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(SOURCE,
DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(OWNER,
DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),};
DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null)
};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, SqoopDataTypes.SQOOP_DBDATASTORE.getName(), null,
......
......@@ -116,7 +116,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
if (StringUtils.isEmpty(owner)) {
owner = ANONYMOUS_OWNER;
}
topologyReferenceable.set("owner", owner);
topologyReferenceable.set(AtlasClient.OWNER, owner);
topologyReferenceable.set("startTime", System.currentTimeMillis());
topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
......@@ -194,7 +194,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
if (StringUtils.isEmpty(topologyOwner)) {
topologyOwner = ANONYMOUS_OWNER;
}
dataSetReferenceable.set("owner", topologyOwner);
dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(getClusterName(stormConf), topicName));
dataSetReferenceable.set(AtlasClient.NAME, topicName);
break;
......@@ -204,7 +204,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
final String hbaseTableName = config.get("HBaseBolt.tableName");
dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir"));
dataSetReferenceable.set(AtlasClient.NAME, hbaseTableName);
dataSetReferenceable.set("owner", stormConf.get("storm.kerberos.principal"));
dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal"));
clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf);
//TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT,
......@@ -220,7 +220,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr);
dataSetReferenceable.set("path", hdfsPathStr);
dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal"));
dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal"));
final Path hdfsPath = new Path(hdfsPathStr);
dataSetReferenceable.set(AtlasClient.NAME, hdfsPath.getName());
break;
......@@ -229,7 +229,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
// todo: verify if hive table has everything needed to retrieve existing table
Referenceable dbReferenceable = new Referenceable("hive_db");
String databaseName = config.get("HiveBolt.options.databaseName");
dbReferenceable.set(HiveDataModelGenerator.NAME, databaseName);
dbReferenceable.set(AtlasClient.NAME, databaseName);
dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName));
dbReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
......@@ -239,7 +239,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
dataSetReferenceable = new Referenceable("hive_table");
final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName,
databaseName, hiveTableName);
dataSetReferenceable.set(HiveDataModelGenerator.NAME, hiveTableName);
dataSetReferenceable.set(AtlasClient.NAME, hiveTableName);
dataSetReferenceable.set(HiveDataModelGenerator.DB, dbReferenceable);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
break;
......@@ -291,8 +291,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
private Referenceable createSpoutInstance(String spoutName,
SpoutSpec stormSpout) throws IllegalAccessException {
Referenceable spoutReferenceable = new Referenceable(
StormDataTypes.STORM_SPOUT.getName(), "DataProducer");
Referenceable spoutReferenceable = new Referenceable(StormDataTypes.STORM_SPOUT.getName());
spoutReferenceable.set(AtlasClient.NAME, spoutName);
Serializable instance = Utils.javaDeserialize(
......@@ -315,8 +314,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
private Referenceable createBoltInstance(String boltName,
Bolt stormBolt) throws IllegalAccessException {
Referenceable boltReferenceable = new Referenceable(
StormDataTypes.STORM_BOLT.getName(), "DataProcessor");
Referenceable boltReferenceable = new Referenceable(StormDataTypes.STORM_BOLT.getName());
boltReferenceable.set(AtlasClient.NAME, boltName);
......
......@@ -45,8 +45,6 @@ object StormDataModel extends App {
*/
_class(StormDataTypes.STORM_TOPOLOGY.getName, List(AtlasClient.PROCESS_SUPER_TYPE)) {
"id" ~ (string, required, indexed, unique)
"description" ~ (string, optional, indexed)
"owner" ~ (string, required, indexed)
"startTime" ~ long
"endTime" ~ long
"conf" ~ (map(string, string), optional)
......@@ -81,31 +79,20 @@ object StormDataModel extends App {
}
// Kafka Data Set
_class(StormDataTypes.KAFKA_TOPIC.getName, List("DataSet")) {
_class(StormDataTypes.KAFKA_TOPIC.getName, List(AtlasClient.DATA_SET_SUPER_TYPE)) {
"topic" ~ (string, required, unique, indexed)
"uri" ~ (string, required)
"owner" ~ (string, required, indexed)
}
// JMS Data Set
_class(StormDataTypes.JMS_TOPIC.getName, List("DataSet")) {
_class(StormDataTypes.JMS_TOPIC.getName, List(AtlasClient.DATA_SET_SUPER_TYPE)) {
"topic" ~ (string, required, unique, indexed)
"uri" ~ (string, required)
"owner" ~ (string, required, indexed)
}
// HBase Data Set
_class(StormDataTypes.HBASE_TABLE.getName, List("DataSet")) {
_class(StormDataTypes.HBASE_TABLE.getName, List(AtlasClient.DATA_SET_SUPER_TYPE)) {
"uri" ~ (string, required)
"owner" ~ (string, required, indexed)
}
_trait("DataProcessor") {
}
_trait("DataProducer") {
}
// Hive table data set already exists in atlas.
}
......
......@@ -70,7 +70,6 @@ import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
public class AtlasClient {
private static final Logger LOG = LoggerFactory.getLogger(AtlasClient.class);
public static final String NAME = "name";
public static final String TYPE = "type";
public static final String TYPENAME = "typeName";
public static final String GUID = "GUID";
......@@ -106,16 +105,20 @@ public class AtlasClient {
public static final String ATTRIBUTE_NAME = "property";
public static final String ATTRIBUTE_VALUE = "value";
public static final String ASSET_TYPE = "Asset";
public static final String NAME = "name";
public static final String DESCRIPTION = "description";
public static final String OWNER = "owner";
public static final String INFRASTRUCTURE_SUPER_TYPE = "Infrastructure";
public static final String DATA_SET_SUPER_TYPE = "DataSet";
public static final String PROCESS_SUPER_TYPE = "Process";
public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable";
public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
public static final String PROCESS_ATTRIBUTE_INPUTS = "inputs";
public static final String PROCESS_ATTRIBUTE_OUTPUTS = "outputs";
public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable";
public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
public static final String UNKNOWN_STATUS = "Unknown status";
......
......@@ -49,7 +49,7 @@
</logger>
<logger name="com.thinkaurelius.titan" additivity="false">
<level value="info"/>
<level value="warn"/>
<appender-ref ref="FILE"/>
</logger>
......
......@@ -3,6 +3,7 @@ Apache Atlas Release Notes
--trunk - unreleased
INCOMPATIBLE CHANGES:
ATLAS-819 All user defined types should have a set of common attributes (shwethags)
ATLAS-915 Fix docs for import-hive changes (svimal2106 via sumasai)
ATLAS-688 import-hive should depend on Hive CLASSPATH jars instead of packaging everything (svimal2106 via sumasai)
ATLAS-835 Falcon Integration with Atlas (sowmyaramesh via shwethags)
......
......@@ -126,7 +126,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@GraphTransaction
public List<String> createEntities(ITypedReferenceableInstance... entities) throws RepositoryException,
EntityExistsException {
LOG.info("adding entities={}", entities);
LOG.debug("adding entities={}", entities);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler);
instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities);
......@@ -141,7 +141,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException, EntityNotFoundException {
LOG.info("Retrieving entity with guid={}", guid);
LOG.debug("Retrieving entity with guid={}", guid);
Vertex instanceVertex = graphHelper.getVertexForGUID(guid);
......@@ -156,7 +156,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@GraphTransaction
public ITypedReferenceableInstance getEntityDefinition(String entityType, String attribute, Object value)
throws AtlasException {
LOG.info("Retrieving entity with type={} and {}={}", entityType, attribute, value);
LOG.debug("Retrieving entity with type={} and {}={}", entityType, attribute, value);
IDataType type = typeSystem.getDataType(IDataType.class, entityType);
String propertyKey = getFieldNameInVertex(type, attribute);
Vertex instanceVertex = graphHelper.findVertex(propertyKey, value,
......@@ -170,7 +170,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public List<String> getEntityList(String entityType) throws RepositoryException {
LOG.info("Retrieving entity list for type={}", entityType);
LOG.debug("Retrieving entity list for type={}", entityType);
GraphQuery query = titanGraph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType);
Iterator<Vertex> results = query.vertices().iterator();
if (!results.hasNext()) {
......@@ -196,7 +196,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public List<String> getTraitNames(String guid) throws AtlasException {
LOG.info("Retrieving trait names for entity={}", guid);
LOG.debug("Retrieving trait names for entity={}", guid);
Vertex instanceVertex = graphHelper.getVertexForGUID(guid);
return GraphHelper.getTraitNames(instanceVertex);
}
......@@ -214,7 +214,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
public void addTrait(String guid, ITypedStruct traitInstance) throws RepositoryException {
Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null");
final String traitName = traitInstance.getTypeName();
LOG.info("Adding a new trait={} for entity={}", traitName, guid);
LOG.debug("Adding a new trait={} for entity={}", traitName, guid);
try {
Vertex instanceVertex = graphHelper.getVertexForGUID(guid);
......@@ -249,7 +249,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public void deleteTrait(String guid, String traitNameToBeDeleted) throws TraitNotFoundException, EntityNotFoundException, RepositoryException {
LOG.info("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid);
Vertex instanceVertex = graphHelper.getVertexForGUID(guid);
......@@ -289,7 +289,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public AtlasClient.EntityResult updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
LOG.info("updating entity {}", entitiesUpdated);
LOG.debug("updating entity {}", entitiesUpdated);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler);
instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_FULL,
......@@ -305,7 +305,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
public AtlasClient.EntityResult updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
LOG.info("updating entity {}", entity);
LOG.debug("updating entity {}", entity);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler);
instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity);
......
......@@ -169,7 +169,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
LOG.info("Creating indexes for type name={}, definition={}", dataType.getName(), dataType.getClass());
try {
addIndexForType(management, dataType);
LOG.info("Index creation for type {} complete", dataType.getName());
LOG.debug("Index creation for type {} complete", dataType.getName());
} catch (Throwable throwable) {
LOG.error("Error creating index for type {}", dataType, throwable);
//Rollback indexes if any failure
......
......@@ -77,6 +77,9 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
/**
* Simple wrapper over TypeSystem and MetadataRepository services with hooks
* for listening to changes to the repository.
......@@ -164,11 +167,6 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
typeSystem.commitTypes(typesAdded);
}
private static final AttributeDefinition NAME_ATTRIBUTE =
TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE);
private static final AttributeDefinition DESCRIPTION_ATTRIBUTE =
TypesUtil.createOptionalAttrDef("description", DataTypes.STRING_TYPE);
@InterfaceAudience.Private
private void createSuperTypes() throws AtlasException {
HierarchicalTypeDefinition<ClassType> referenceableType = TypesUtil
......@@ -177,23 +175,29 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
DataTypes.STRING_TYPE));
createType(referenceableType);
HierarchicalTypeDefinition<ClassType> assetType = TypesUtil
.createClassTypeDef(AtlasClient.ASSET_TYPE, ImmutableSet.<String>of(),
TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE),
TypesUtil.createOptionalAttrDef(AtlasClient.DESCRIPTION, DataTypes.STRING_TYPE),
TypesUtil.createOptionalAttrDef(AtlasClient.OWNER, DataTypes.STRING_TYPE));
createType(assetType);
HierarchicalTypeDefinition<ClassType> infraType = TypesUtil
.createClassTypeDef(AtlasClient.INFRASTRUCTURE_SUPER_TYPE, ImmutableSet.<String>of(AtlasClient.REFERENCEABLE_SUPER_TYPE), NAME_ATTRIBUTE,
DESCRIPTION_ATTRIBUTE);
.createClassTypeDef(AtlasClient.INFRASTRUCTURE_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
createType(infraType);
HierarchicalTypeDefinition<ClassType> datasetType = TypesUtil
.createClassTypeDef(AtlasClient.DATA_SET_SUPER_TYPE, ImmutableSet.<String>of(AtlasClient.REFERENCEABLE_SUPER_TYPE), NAME_ATTRIBUTE,
DESCRIPTION_ATTRIBUTE);
.createClassTypeDef(AtlasClient.DATA_SET_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE));
createType(datasetType);
HierarchicalTypeDefinition<ClassType> processType = TypesUtil
.createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE, ImmutableSet.<String>of(AtlasClient.REFERENCEABLE_SUPER_TYPE),
TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE),
DESCRIPTION_ATTRIBUTE,
new AttributeDefinition("inputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
.createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE,
ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE),
new AttributeDefinition(PROCESS_ATTRIBUTE_INPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("outputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
new AttributeDefinition(PROCESS_ATTRIBUTE_OUTPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null));
createType(processType);
}
......
......@@ -28,14 +28,11 @@ public class StructTypeDefinition {
public final String typeDescription;//optional field
public final AttributeDefinition[] attributeDefinitions;
protected StructTypeDefinition(String typeName, boolean validate, AttributeDefinition... attributeDefinitions) {
this(typeName, null, validate, attributeDefinitions);
}
protected StructTypeDefinition(String typeName, String typeDescription, boolean validate, AttributeDefinition... attributeDefinitions) {
protected StructTypeDefinition(String typeName, String typeDescription, boolean validate,
AttributeDefinition... attributeDefinitions) {
this.typeName = ParamChecker.notEmpty(typeName, "Struct type name");
this.typeDescription = typeDescription;
if (attributeDefinitions != null && attributeDefinitions.length != 0) {
if (validate) {
ParamChecker.notNullElements(attributeDefinitions, "Attribute definitions");
}
this.attributeDefinitions = attributeDefinitions;
......
......@@ -35,7 +35,6 @@ import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.lang.StringUtils;
import org.apache.http.protocol.HTTP;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
......@@ -121,9 +120,10 @@ public class EntityResource {
}
entityJson = AtlasClient.toString(new JSONArray(entities));
LOG.debug("submitting entities {} ", entityJson);
LOG.info("submitting entities {} ", entityJson);
final List<String> guids = metadataService.createEntities(entities);
LOG.info("Created entities {}", guids);
JSONObject response = getResponse(new AtlasClient.EntityResult(guids, null, null));
URI locationURI = getLocationURI(guids);
......@@ -189,9 +189,11 @@ public class EntityResource {
final String entities = Servlets.getRequestPayload(request);
entityJson = AtlasClient.toString(new JSONArray(entities));
LOG.debug("updating entities {} ", entityJson);
LOG.info("updating entities {} ", entityJson);
AtlasClient.EntityResult entityResult = metadataService.updateEntities(entities);
LOG.info("Updated entities: {}", entityResult);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
} catch(EntityExistsException e) {
......@@ -253,13 +255,14 @@ public class EntityResource {
try {
entityJson = Servlets.getRequestPayload(request);
LOG.debug("Partially updating entity by unique attribute {} {} {} {} ", entityType, attribute, value, entityJson);
LOG.info("Partially updating entity by unique attribute {} {} {} {} ", entityType, attribute, value, entityJson);
Referenceable updatedEntity =
InstanceSerialization.fromJsonReferenceable(entityJson, true);
AtlasClient.EntityResult entityResult =
metadataService.updateEntityByUniqueAttribute(entityType, attribute, value, updatedEntity);
LOG.info("Updated entities: {}", entityResult);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
......@@ -308,11 +311,14 @@ public class EntityResource {
try {
ParamChecker.notEmpty(guid, "Guid property cannot be null");
entityJson = Servlets.getRequestPayload(request);
LOG.debug("partially updating entity for guid {} : {} ", guid, entityJson);
LOG.info("partially updating entity for guid {} : {} ", guid, entityJson);
Referenceable updatedEntity =
InstanceSerialization.fromJsonReferenceable(entityJson, true);
AtlasClient.EntityResult entityResult = metadataService.updateEntityPartialByGuid(guid, updatedEntity);
LOG.info("Updated entities: {}", entityResult);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
} catch (EntityNotFoundException e) {
......@@ -344,7 +350,10 @@ public class EntityResource {
value = Servlets.getRequestPayload(request);
Preconditions.checkNotNull(value, "Entity value cannot be null");
LOG.info("Updating entity {} for property {} = {}", guid, property, value);
AtlasClient.EntityResult entityResult = metadataService.updateEntityAttributeByGuid(guid, property, value);
LOG.info("Updated entities: {}", entityResult);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
} catch (EntityNotFoundException e) {
......@@ -381,10 +390,13 @@ public class EntityResource {
try {
AtlasClient.EntityResult entityResult;
if (guids != null && !guids.isEmpty()) {
LOG.info("Deleting entities {}", guids);
entityResult = metadataService.deleteEntities(guids);
} else {
LOG.info("Deleting entity type={} with property {}={}", entityType, attribute, value);
entityResult = metadataService.deleteEntityByUniqueAttribute(entityType, attribute, value);
}
LOG.info("Deleted entity result: {}", entityResult);
JSONObject response = getResponse(entityResult);
return Response.ok(response).build();
} catch (EntityNotFoundException e) {
......@@ -579,7 +591,7 @@ public class EntityResource {
String traitDefinition = null;
try {
traitDefinition = Servlets.getRequestPayload(request);
LOG.debug("Adding trait={} for entity={} ", traitDefinition, guid);
LOG.info("Adding trait={} for entity={} ", traitDefinition, guid);
metadataService.addTrait(guid, traitDefinition);
URI locationURI = getLocationURI(new ArrayList<String>() {{
......@@ -614,7 +626,7 @@ public class EntityResource {
@Produces(Servlets.JSON_MEDIA_TYPE)
public Response deleteTrait(@Context HttpServletRequest request, @PathParam("guid") String guid,
@PathParam(TRAIT_NAME) String traitName) {
LOG.debug("Deleting trait={} from entity={} ", traitName, guid);
LOG.info("Deleting trait={} from entity={} ", traitName, guid);
try {
metadataService.deleteTrait(guid, traitName);
......
......@@ -82,7 +82,7 @@ public class TypesResource {
public Response submit(@Context HttpServletRequest request) {
try {
final String typeDefinition = Servlets.getRequestPayload(request);
LOG.debug("Creating type with definition {} ", typeDefinition);
LOG.info("Creating type with definition {} ", typeDefinition);
JSONObject typesJson = metadataService.createType(typeDefinition);
final JSONArray typesJsonArray = typesJson.getJSONArray(AtlasClient.TYPES);
......@@ -126,7 +126,7 @@ public class TypesResource {
public Response update(@Context HttpServletRequest request) {
try {
final String typeDefinition = Servlets.getRequestPayload(request);
LOG.debug("Updating type with definition {} ", typeDefinition);
LOG.info("Updating type with definition {} ", typeDefinition);
JSONObject typesJson = metadataService.updateType(typeDefinition);
final JSONArray typesJsonArray = typesJson.getJSONArray(AtlasClient.TYPES);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment