Commit a02be15d by Madhan Neethiraj

ATLAS-2421: updated Atlas notificaiton module to support V2 data structures;…

ATLAS-2421: updated Atlas notificaiton module to support V2 data structures; updated HBase hook to use V2 notifications (cherry picked from commit 86c9b19316245c47301e5212552c35d362479fc7)
parent dcdd3d68
...@@ -23,11 +23,15 @@ import org.apache.atlas.AtlasConstants; ...@@ -23,11 +23,15 @@ import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hbase.model.HBaseOperationContext; import org.apache.atlas.hbase.model.HBaseOperationContext;
import org.apache.atlas.hbase.model.HBaseDataTypes; import org.apache.atlas.hbase.model.HBaseDataTypes;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
...@@ -43,6 +47,7 @@ import org.slf4j.LoggerFactory; ...@@ -43,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -216,113 +221,122 @@ public class HBaseAtlasHook extends AtlasHook { ...@@ -216,113 +221,122 @@ public class HBaseAtlasHook extends AtlasHook {
} }
private void createOrUpdateNamespaceInstance(HBaseOperationContext hbaseOperationContext) { private void createOrUpdateNamespaceInstance(HBaseOperationContext hbaseOperationContext) {
Referenceable nameSpaceRef = buildNameSpaceRef(hbaseOperationContext); AtlasEntity nameSpace = buildNameSpace(hbaseOperationContext);
switch (hbaseOperationContext.getOperation()) { switch (hbaseOperationContext.getOperation()) {
case CREATE_NAMESPACE: case CREATE_NAMESPACE:
LOG.info("Create NameSpace {}", nameSpaceRef.get(REFERENCEABLE_ATTRIBUTE_NAME)); LOG.info("Create NameSpace {}", nameSpace.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef)); hbaseOperationContext.addMessage(new EntityCreateRequestV2(hbaseOperationContext.getUser(), new AtlasEntitiesWithExtInfo(nameSpace)));
break; break;
case ALTER_NAMESPACE: case ALTER_NAMESPACE:
LOG.info("Modify NameSpace {}", nameSpaceRef.get(REFERENCEABLE_ATTRIBUTE_NAME)); LOG.info("Modify NameSpace {}", nameSpace.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef)); hbaseOperationContext.addMessage(new EntityUpdateRequestV2(hbaseOperationContext.getUser(), new AtlasEntitiesWithExtInfo(nameSpace)));
break; break;
} }
} }
private void deleteNameSpaceInstance(HBaseOperationContext hbaseOperationContext) { private void deleteNameSpaceInstance(HBaseOperationContext hbaseOperationContext) {
String nameSpaceQualifiedName = getNameSpaceQualifiedName(clusterName, hbaseOperationContext.getNameSpace()); String nameSpaceQName = getNameSpaceQualifiedName(clusterName, hbaseOperationContext.getNameSpace());
AtlasObjectId nameSpaceId = new AtlasObjectId(HBaseDataTypes.HBASE_NAMESPACE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, nameSpaceQName);
LOG.info("Delete NameSpace {}", nameSpaceQualifiedName); LOG.info("Delete NameSpace {}", nameSpaceQName);
hbaseOperationContext.addMessage(new EntityDeleteRequest(hbaseOperationContext.getUser(), hbaseOperationContext.addMessage(new EntityDeleteRequestV2(hbaseOperationContext.getUser(), Collections.singletonList(nameSpaceId)));
HBaseDataTypes.HBASE_NAMESPACE.getName(),
REFERENCEABLE_ATTRIBUTE_NAME,
nameSpaceQualifiedName));
} }
private void createOrUpdateTableInstance(HBaseOperationContext hbaseOperationContext) { private void createOrUpdateTableInstance(HBaseOperationContext hbaseOperationContext) {
Referenceable nameSpaceRef = buildNameSpaceRef(hbaseOperationContext); AtlasEntity nameSpace = buildNameSpace(hbaseOperationContext);
Referenceable tableRef = buildTableRef(hbaseOperationContext, nameSpaceRef); AtlasEntity table = buildTable(hbaseOperationContext, nameSpace);
List<Referenceable> columnFamilyRef = buildColumnFamiliesRef(hbaseOperationContext, nameSpaceRef, tableRef); List<AtlasEntity> columnFamilies = buildColumnFamilies(hbaseOperationContext, nameSpace, table);
tableRef.set(ATTR_COLUMNFAMILIES, columnFamilyRef); table.setAttribute(ATTR_COLUMNFAMILIES, AtlasTypeUtil.getAtlasObjectIds(columnFamilies));
AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(table);
entities.addReferredEntity(nameSpace);
if (CollectionUtils.isNotEmpty(columnFamilies)) {
for (AtlasEntity columnFamily : columnFamilies) {
entities.addReferredEntity(columnFamily);
}
}
switch (hbaseOperationContext.getOperation()) { switch (hbaseOperationContext.getOperation()) {
case CREATE_TABLE: case CREATE_TABLE:
LOG.info("Create Table {}", tableRef.get(REFERENCEABLE_ATTRIBUTE_NAME)); LOG.info("Create Table {}", table.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef)); hbaseOperationContext.addMessage(new EntityCreateRequestV2(hbaseOperationContext.getUser(), entities));
break; break;
case ALTER_TABLE: case ALTER_TABLE:
LOG.info("Modify Table {}", tableRef.get(REFERENCEABLE_ATTRIBUTE_NAME)); LOG.info("Modify Table {}", table.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef)); hbaseOperationContext.addMessage(new EntityUpdateRequestV2(hbaseOperationContext.getUser(), entities));
break; break;
} }
} }
private void deleteTableInstance(HBaseOperationContext hbaseOperationContext) { private void deleteTableInstance(HBaseOperationContext hbaseOperationContext) {
TableName tableName = hbaseOperationContext.getTableName(); TableName tableName = hbaseOperationContext.getTableName();
String tableNameSpace = tableName.getNamespaceAsString(); String nameSpaceName = tableName.getNamespaceAsString();
if (tableNameSpace == null) { if (nameSpaceName == null) {
tableNameSpace = tableName.getNameWithNamespaceInclAsString(); nameSpaceName = tableName.getNameWithNamespaceInclAsString();
} }
String tableNameStr = tableName.getNameAsString(); String tableNameStr = tableName.getNameAsString();
String tableQualifiedName = getTableQualifiedName(clusterName, tableNameSpace, tableNameStr); String tableQName = getTableQualifiedName(clusterName, nameSpaceName, tableNameStr);
AtlasObjectId tableId = new AtlasObjectId(HBaseDataTypes.HBASE_TABLE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, tableQName);
LOG.info("Delete Table {}", tableQualifiedName); LOG.info("Delete Table {}", tableQName);
hbaseOperationContext.addMessage(new EntityDeleteRequest(hbaseOperationContext.getUser(), hbaseOperationContext.addMessage(new EntityDeleteRequestV2(hbaseOperationContext.getUser(), Collections.singletonList(tableId)));
HBaseDataTypes.HBASE_TABLE.getName(),
REFERENCEABLE_ATTRIBUTE_NAME,
tableQualifiedName));
} }
private void createOrUpdateColumnFamilyInstance(HBaseOperationContext hbaseOperationContext) { private void createOrUpdateColumnFamilyInstance(HBaseOperationContext hbaseOperationContext) {
Referenceable nameSpaceRef = buildNameSpaceRef(hbaseOperationContext); AtlasEntity nameSpace = buildNameSpace(hbaseOperationContext);
Referenceable tableRef = buildTableRef(hbaseOperationContext, nameSpaceRef); AtlasEntity table = buildTable(hbaseOperationContext, nameSpace);
Referenceable columnFamilyRef = buildColumnFamilyRef(hbaseOperationContext, hbaseOperationContext.gethColumnDescriptor(), nameSpaceRef, tableRef); AtlasEntity columnFamily = buildColumnFamily(hbaseOperationContext, hbaseOperationContext.gethColumnDescriptor(), nameSpace, table);
AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(columnFamily);
entities.addReferredEntity(nameSpace);
entities.addReferredEntity(table);
switch (hbaseOperationContext.getOperation()) { switch (hbaseOperationContext.getOperation()) {
case CREATE_COLUMN_FAMILY: case CREATE_COLUMN_FAMILY:
LOG.info("Create ColumnFamily {}", columnFamilyRef.get(REFERENCEABLE_ATTRIBUTE_NAME)); LOG.info("Create ColumnFamily {}", columnFamily.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new EntityCreateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef)); hbaseOperationContext.addMessage(new EntityCreateRequestV2(hbaseOperationContext.getUser(), entities));
break; break;
case ALTER_COLUMN_FAMILY: case ALTER_COLUMN_FAMILY:
LOG.info("Alter ColumnFamily {}", columnFamilyRef.get(REFERENCEABLE_ATTRIBUTE_NAME)); LOG.info("Alter ColumnFamily {}", columnFamily.getAttribute(REFERENCEABLE_ATTRIBUTE_NAME));
hbaseOperationContext.addMessage(new EntityUpdateRequest(hbaseOperationContext.getUser(), nameSpaceRef, tableRef, columnFamilyRef)); hbaseOperationContext.addMessage(new EntityUpdateRequestV2(hbaseOperationContext.getUser(), entities));
break; break;
} }
} }
private void deleteColumnFamilyInstance(HBaseOperationContext hbaseOperationContext) { private void deleteColumnFamilyInstance(HBaseOperationContext hbaseOperationContext) {
TableName tableName = hbaseOperationContext.getTableName(); TableName tableName = hbaseOperationContext.getTableName();
String tableNameSpace = tableName.getNamespaceAsString(); String nameSpaceName = tableName.getNamespaceAsString();
if (tableNameSpace == null) { if (nameSpaceName == null) {
tableNameSpace = tableName.getNameWithNamespaceInclAsString(); nameSpaceName = tableName.getNameWithNamespaceInclAsString();
} }
String tableNameStr = tableName.getNameAsString(); String tableNameStr = tableName.getNameAsString();
String columnFamilyName = hbaseOperationContext.getColummFamily(); String columnFamilyName = hbaseOperationContext.getColummFamily();
String columnFamilyQualifiedName = getColumnFamilyQualifiedName(clusterName, tableNameSpace, tableNameStr, columnFamilyName); String columnFamilyQName = getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableNameStr, columnFamilyName);
AtlasObjectId columnFamilyId = new AtlasObjectId(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(), REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQName);
LOG.info("Delete ColumnFamily {}", columnFamilyQualifiedName); LOG.info("Delete ColumnFamily {}", columnFamilyQName);
hbaseOperationContext.addMessage(new EntityDeleteRequest(hbaseOperationContext.getUser(), hbaseOperationContext.addMessage(new EntityDeleteRequestV2(hbaseOperationContext.getUser(), Collections.singletonList(columnFamilyId)));
HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(),
REFERENCEABLE_ATTRIBUTE_NAME,
columnFamilyQualifiedName));
} }
...@@ -366,127 +380,123 @@ public class HBaseAtlasHook extends AtlasHook { ...@@ -366,127 +380,123 @@ public class HBaseAtlasHook extends AtlasHook {
return tableName.substring(tableName.indexOf(":") + 1); return tableName.substring(tableName.indexOf(":") + 1);
} }
private Referenceable buildNameSpaceRef(HBaseOperationContext hbaseOperationContext) { private AtlasEntity buildNameSpace(HBaseOperationContext hbaseOperationContext) {
Referenceable nameSpaceRef = new Referenceable(HBaseDataTypes.HBASE_NAMESPACE.getName()); AtlasEntity nameSpace = new AtlasEntity(HBaseDataTypes.HBASE_NAMESPACE.getName());
String nameSpace = null;
NamespaceDescriptor nameSpaceDesc = hbaseOperationContext.getNamespaceDescriptor(); NamespaceDescriptor nameSpaceDesc = hbaseOperationContext.getNamespaceDescriptor();
String nameSpaceName = nameSpaceDesc == null ? null : hbaseOperationContext.getNamespaceDescriptor().getName();
if (nameSpaceDesc != null) { if (nameSpaceName == null) {
nameSpace = hbaseOperationContext.getNamespaceDescriptor().getName(); nameSpaceName = hbaseOperationContext.getNameSpace();
}
if (nameSpace == null) {
nameSpace = hbaseOperationContext.getNameSpace();
} }
nameSpaceRef.set(ATTR_NAME, nameSpace);
nameSpaceRef.set(REFERENCEABLE_ATTRIBUTE_NAME, getNameSpaceQualifiedName(clusterName, nameSpace));
nameSpaceRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
nameSpaceRef.set(ATTR_DESCRIPTION, nameSpace);
nameSpaceRef.set(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
nameSpaceRef.set(ATTR_OWNER, hbaseOperationContext.getOwner());
Date now = new Date(System.currentTimeMillis()); Date now = new Date(System.currentTimeMillis());
nameSpace.setAttribute(ATTR_NAME, nameSpaceName);
nameSpace.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, getNameSpaceQualifiedName(clusterName, nameSpaceName));
nameSpace.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
nameSpace.setAttribute(ATTR_DESCRIPTION, nameSpaceName);
nameSpace.setAttribute(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
nameSpace.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner());
nameSpace.setAttribute(ATTR_MODIFIED_TIME, now);
if (OPERATION.CREATE_NAMESPACE.equals(hbaseOperationContext.getOperation())) { if (OPERATION.CREATE_NAMESPACE.equals(hbaseOperationContext.getOperation())) {
nameSpaceRef.set(ATTR_CREATE_TIME, now); nameSpace.setAttribute(ATTR_CREATE_TIME, now);
nameSpaceRef.set(ATTR_MODIFIED_TIME, now);
} else {
nameSpaceRef.set(ATTR_MODIFIED_TIME, now);
} }
return nameSpaceRef; return nameSpace;
} }
private Referenceable buildTableRef(HBaseOperationContext hbaseOperationContext, Referenceable nameSpaceRef) { private AtlasEntity buildTable(HBaseOperationContext hbaseOperationContext, AtlasEntity nameSpace) {
Referenceable tableRef = new Referenceable(HBaseDataTypes.HBASE_TABLE.getName()); AtlasEntity table = new AtlasEntity(HBaseDataTypes.HBASE_TABLE.getName());
String tableName = getTableName(hbaseOperationContext); String tableName = getTableName(hbaseOperationContext);
String tableNameSpace = hbaseOperationContext.getNameSpace(); String nameSpaceName = (String) nameSpace.getAttribute(ATTR_NAME);
String tableQualifiedName = getTableQualifiedName(clusterName, tableNameSpace, tableName); String tableQName = getTableQualifiedName(clusterName, nameSpaceName, tableName);
OPERATION operation = hbaseOperationContext.getOperation(); OPERATION operation = hbaseOperationContext.getOperation();
Date now = new Date(System.currentTimeMillis()); Date now = new Date(System.currentTimeMillis());
tableRef.set(REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); table.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, tableQName);
tableRef.set(ATTR_NAME, tableName); table.setAttribute(ATTR_NAME, tableName);
tableRef.set(ATTR_URI, tableName); table.setAttribute(ATTR_URI, tableName);
tableRef.set(ATTR_OWNER, hbaseOperationContext.getOwner()); table.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner());
tableRef.set(ATTR_DESCRIPTION, tableName); table.setAttribute(ATTR_DESCRIPTION, tableName);
tableRef.set(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf()); table.setAttribute(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
table.setAttribute(ATTR_NAMESPACE, AtlasTypeUtil.getAtlasObjectId(nameSpace));
switch (operation) { switch (operation) {
case CREATE_TABLE: case CREATE_TABLE:
tableRef.set(ATTR_NAMESPACE, nameSpaceRef); table.setAttribute(ATTR_CREATE_TIME, now);
tableRef.set(ATTR_CREATE_TIME, now); table.setAttribute(ATTR_MODIFIED_TIME, now);
tableRef.set(ATTR_MODIFIED_TIME, now);
break; break;
case ALTER_TABLE: case ALTER_TABLE:
tableRef.set(ATTR_NAMESPACE, nameSpaceRef); table.setAttribute(ATTR_MODIFIED_TIME, now);
tableRef.set(ATTR_MODIFIED_TIME, now);
break; break;
default: default:
tableRef.set(ATTR_NAMESPACE, nameSpaceRef.getId());
break; break;
} }
return tableRef; return table;
} }
private List<Referenceable> buildColumnFamiliesRef(HBaseOperationContext hbaseOperationContext, Referenceable nameSpaceRef, Referenceable tableRef) { private List<AtlasEntity> buildColumnFamilies(HBaseOperationContext hbaseOperationContext, AtlasEntity nameSpace, AtlasEntity table) {
List<Referenceable> entities = new ArrayList<>(); List<AtlasEntity> columnFamilies = new ArrayList<>();
HColumnDescriptor[] hColumnDescriptors = hbaseOperationContext.gethColumnDescriptors(); HColumnDescriptor[] hColumnDescriptors = hbaseOperationContext.gethColumnDescriptors();
if (hColumnDescriptors != null) { if (hColumnDescriptors != null) {
for (HColumnDescriptor hColumnDescriptor : hColumnDescriptors) { for (HColumnDescriptor hColumnDescriptor : hColumnDescriptors) {
Referenceable columnFamilyRef = buildColumnFamilyRef(hbaseOperationContext, hColumnDescriptor, nameSpaceRef, tableRef); AtlasEntity columnFamily = buildColumnFamily(hbaseOperationContext, hColumnDescriptor, nameSpace, table);
entities.add(columnFamilyRef); columnFamilies.add(columnFamily);
} }
} }
return entities; return columnFamilies;
} }
private Referenceable buildColumnFamilyRef(HBaseOperationContext hbaseOperationContext, HColumnDescriptor hColumnDescriptor, Referenceable nameSpaceRef, Referenceable tableReference) { private AtlasEntity buildColumnFamily(HBaseOperationContext hbaseOperationContext, HColumnDescriptor hColumnDescriptor, AtlasEntity nameSpace, AtlasEntity table) {
Referenceable columnFamilyRef = new Referenceable(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName()); AtlasEntity columnFamily = new AtlasEntity(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName());
String columnFamilyName = hColumnDescriptor.getNameAsString(); String columnFamilyName = hColumnDescriptor.getNameAsString();
String tableName = (String) tableReference.get(ATTR_NAME); String tableName = (String) table.getAttribute(ATTR_NAME);
String namespace = (String) nameSpaceRef.get(ATTR_NAME); String nameSpaceName = (String) nameSpace.getAttribute(ATTR_NAME);
String columnFamilyQName = getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableName, columnFamilyName);
Date now = new Date(System.currentTimeMillis());
String columnFamilyQualifiedName = getColumnFamilyQualifiedName(clusterName, namespace, tableName, columnFamilyName); columnFamily.setAttribute(ATTR_NAME, columnFamilyName);
columnFamily.setAttribute(ATTR_DESCRIPTION, columnFamilyName);
columnFamilyRef.set(ATTR_NAME, columnFamilyName); columnFamily.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQName);
columnFamilyRef.set(ATTR_DESCRIPTION, columnFamilyName); columnFamily.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner());
columnFamilyRef.set(REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQualifiedName); columnFamily.setAttribute(ATTR_TABLE, AtlasTypeUtil.getAtlasObjectId(table));
columnFamilyRef.set(ATTR_OWNER, hbaseOperationContext.getOwner());
Date now = new Date(System.currentTimeMillis());
switch (hbaseOperationContext.getOperation()) { switch (hbaseOperationContext.getOperation()) {
case CREATE_COLUMN_FAMILY: case CREATE_COLUMN_FAMILY:
columnFamilyRef.set(ATTR_TABLE, tableReference); columnFamily.setAttribute(ATTR_CREATE_TIME, now);
columnFamilyRef.set(ATTR_CREATE_TIME, now); columnFamily.setAttribute(ATTR_MODIFIED_TIME, now);
columnFamilyRef.set(ATTR_MODIFIED_TIME, now);
break; break;
case ALTER_COLUMN_FAMILY: case ALTER_COLUMN_FAMILY:
columnFamilyRef.set(ATTR_TABLE, tableReference); columnFamily.setAttribute(ATTR_MODIFIED_TIME, now);
columnFamilyRef.set(ATTR_MODIFIED_TIME, now);
break; break;
default: default:
columnFamilyRef.set(ATTR_TABLE, tableReference.getId()); break;
} }
return columnFamilyRef; return columnFamily;
} }
private String getTableName(HBaseOperationContext hbaseOperationContext) { private String getTableName(HBaseOperationContext hbaseOperationContext) {
HTableDescriptor tableDescriptor = hbaseOperationContext.gethTableDescriptor(); final String ret;
return (tableDescriptor != null) ? tableDescriptor.getNameAsString() : null; TableName tableName = hbaseOperationContext.getTableName();
if (tableName != null) {
ret = tableName.getNameAsString();
} else {
HTableDescriptor tableDescriptor = hbaseOperationContext.gethTableDescriptor();
ret = (tableDescriptor != null) ? tableDescriptor.getNameAsString() : null;
}
return ret;
} }
private void notifyAsPrivilegedAction(final HBaseOperationContext hbaseOperationContext) { private void notifyAsPrivilegedAction(final HBaseOperationContext hbaseOperationContext) {
...@@ -496,7 +506,6 @@ public class HBaseAtlasHook extends AtlasHook { ...@@ -496,7 +506,6 @@ public class HBaseAtlasHook extends AtlasHook {
final List<HookNotification> messages = hbaseOperationContext.getMessages(); final List<HookNotification> messages = hbaseOperationContext.getMessages();
try { try {
PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() { PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() {
@Override @Override
......
...@@ -20,9 +20,11 @@ package org.apache.atlas.hbase; ...@@ -20,9 +20,11 @@ package org.apache.atlas.hbase;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.hbase.bridge.HBaseAtlasHook; import org.apache.atlas.hbase.bridge.HBaseAtlasHook;
import org.apache.atlas.hbase.model.HBaseDataTypes; import org.apache.atlas.hbase.model.HBaseDataTypes;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.utils.ParamChecker;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
...@@ -40,6 +42,7 @@ import org.testng.annotations.Test; ...@@ -40,6 +42,7 @@ import org.testng.annotations.Test;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotNull;
...@@ -47,12 +50,13 @@ import static org.testng.Assert.fail; ...@@ -47,12 +50,13 @@ import static org.testng.Assert.fail;
public class HBaseAtlasHookIT { public class HBaseAtlasHookIT {
private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHookIT.class); private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHookIT.class);
protected static final String DGI_URL = "http://localhost:31000/"; protected static final String ATLAS_URL = "http://localhost:31000/";
protected static final String CLUSTER_NAME = "primary"; protected static final String CLUSTER_NAME = "primary";
private static HBaseTestingUtility utility;
private static int port; private HBaseTestingUtility utility;
private static AtlasClient atlasClient; private int port;
private AtlasClientV2 atlasClient;
@BeforeClass @BeforeClass
...@@ -65,36 +69,42 @@ public class HBaseAtlasHookIT { ...@@ -65,36 +69,42 @@ public class HBaseAtlasHookIT {
} }
} }
@AfterClass @AfterClass
public static void cleanup() throws Exception { public void cleanup() throws Exception {
LOG.info(" Stopping mini cluster.. "); LOG.info("Stopping mini cluster.. ");
utility.shutdownMiniCluster(); utility.shutdownMiniCluster();
} }
@Test @Test
public void testCreateNamesapce() throws Exception { public void testCreateNamesapce() throws Exception {
final Configuration conf = HBaseConfiguration.create(); final Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost"); conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", String.valueOf(port)); conf.set("hbase.zookeeper.property.clientPort", String.valueOf(port));
conf.set("zookeeper.znode.parent", "/hbase-unsecure"); conf.set("zookeeper.znode.parent", "/hbase-unsecure");
Connection conn = ConnectionFactory.createConnection(conf); Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin(); Admin admin = conn.getAdmin();
NamespaceDescriptor ns = NamespaceDescriptor.create("test_namespace").build(); NamespaceDescriptor ns = NamespaceDescriptor.create("test_namespace").build();
admin.createNamespace(ns); admin.createNamespace(ns);
String nameSpace = assertNameSpaceIsRegistered(ns.getName());
//assert on qualified name //assert on qualified name
Referenceable nameSpaceRef = getAtlasClient().getEntity(nameSpace); String nameSpace = assertNameSpaceIsRegistered(ns.getName());
String nameSpaceQualifiedName = HBaseAtlasHook.getNameSpaceQualifiedName(CLUSTER_NAME, ns.getName()); AtlasEntityWithExtInfo nameSpaceRef = getAtlasClient().getEntityByGuid(nameSpace);
Assert.assertEquals(nameSpaceRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), nameSpaceQualifiedName); String nameSpaceQualifiedName = HBaseAtlasHook.getNameSpaceQualifiedName(CLUSTER_NAME, ns.getName());
Assert.assertEquals(nameSpaceRef.getEntity().getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), nameSpaceQualifiedName);
} }
@Test @Test
public void testCreateTable() throws Exception { public void testCreateTable() throws Exception {
final Configuration conf = HBaseConfiguration.create(); final Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost"); conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", String.valueOf(port)); conf.set("hbase.zookeeper.property.clientPort", String.valueOf(port));
conf.set("zookeeper.znode.parent", "/hbase-unsecure"); conf.set("zookeeper.znode.parent", "/hbase-unsecure");
Connection conn = ConnectionFactory.createConnection(conf); Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin(); Admin admin = conn.getAdmin();
String namespace = "test_namespace1"; String namespace = "test_namespace1";
...@@ -103,27 +113,35 @@ public class HBaseAtlasHookIT { ...@@ -103,27 +113,35 @@ public class HBaseAtlasHookIT {
// Create a table // Create a table
if (!admin.tableExists(TableName.valueOf(namespace, tablename))) { if (!admin.tableExists(TableName.valueOf(namespace, tablename))) {
NamespaceDescriptor ns = NamespaceDescriptor.create(namespace).build(); NamespaceDescriptor ns = NamespaceDescriptor.create(namespace).build();
admin.createNamespace(ns); admin.createNamespace(ns);
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(namespace, tablename)); HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(namespace, tablename));
tableDescriptor.addFamily(new HColumnDescriptor("colfam1")); tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
admin.createTable(tableDescriptor); admin.createTable(tableDescriptor);
} }
String table = assertTableIsRegistered(namespace, tablename);
//assert on qualified name //assert on qualified name
Referenceable tableRef = getAtlasClient().getEntity(table); String table = assertTableIsRegistered(namespace, tablename);
String entityName = HBaseAtlasHook.getTableQualifiedName(CLUSTER_NAME, namespace, tablename); AtlasEntityWithExtInfo tableRef = getAtlasClient().getEntityByGuid(table);
Assert.assertEquals(tableRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), entityName); String entityName = HBaseAtlasHook.getTableQualifiedName(CLUSTER_NAME, namespace, tablename);
Assert.assertEquals(tableRef.getEntity().getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), entityName);
} }
// Methods for creating HBase // Methods for creating HBase
public static void createAtlasClient() { private void createAtlasClient() {
try { try {
org.apache.commons.configuration.Configuration configuration = ApplicationProperties.get(); org.apache.commons.configuration.Configuration configuration = ApplicationProperties.get();
String[] atlasEndPoint = configuration.getStringArray(HBaseAtlasHook.ATTR_ATLAS_ENDPOINT); String[] atlasEndPoint = configuration.getStringArray(HBaseAtlasHook.ATTR_ATLAS_ENDPOINT);
configuration.setProperty("atlas.cluster.name", CLUSTER_NAME); configuration.setProperty("atlas.cluster.name", CLUSTER_NAME);
if (atlasEndPoint == null || atlasEndPoint.length == 0) { if (atlasEndPoint == null || atlasEndPoint.length == 0) {
atlasEndPoint = new String[]{DGI_URL}; atlasEndPoint = new String[]{ATLAS_URL};
} }
Iterator<String> keys = configuration.getKeys(); Iterator<String> keys = configuration.getKeys();
...@@ -133,12 +151,10 @@ public class HBaseAtlasHookIT { ...@@ -133,12 +151,10 @@ public class HBaseAtlasHookIT {
} }
if (AuthenticationUtil.isKerberosAuthenticationEnabled()) { if (AuthenticationUtil.isKerberosAuthenticationEnabled()) {
atlasClient = new AtlasClient(configuration, atlasEndPoint); atlasClient = new AtlasClientV2(configuration, atlasEndPoint, null);
} else { } else {
atlasClient = new AtlasClient(configuration, atlasEndPoint, new String[]{"admin", "admin"}); atlasClient = new AtlasClientV2(configuration, atlasEndPoint, new String[]{"admin", "admin"});
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Unable to create AtlasClient for Testing ", e); LOG.error("Unable to create AtlasClient for Testing ", e);
} }
...@@ -147,15 +163,18 @@ public class HBaseAtlasHookIT { ...@@ -147,15 +163,18 @@ public class HBaseAtlasHookIT {
private static int getFreePort() throws IOException { private static int getFreePort() throws IOException {
ServerSocket serverSocket = new ServerSocket(0); ServerSocket serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort(); int port = serverSocket.getLocalPort();
serverSocket.close(); serverSocket.close();
return port; return port;
} }
public static void createHBaseCluster() throws Exception { private void createHBaseCluster() throws Exception {
LOG.info("Creating Hbase Admin..."); LOG.info("Creating Hbase Admin...");
port = getFreePort();
port = getFreePort();
utility = new HBaseTestingUtility(); utility = new HBaseTestingUtility();
utility.getConfiguration().set("test.hbase.zookeeper.property.clientPort", String.valueOf(port)); utility.getConfiguration().set("test.hbase.zookeeper.property.clientPort", String.valueOf(port));
utility.getConfiguration().set("hbase.master.port", String.valueOf(getFreePort())); utility.getConfiguration().set("hbase.master.port", String.valueOf(getFreePort()));
utility.getConfiguration().set("hbase.master.info.port", String.valueOf(getFreePort())); utility.getConfiguration().set("hbase.master.info.port", String.valueOf(getFreePort()));
...@@ -170,8 +189,8 @@ public class HBaseAtlasHookIT { ...@@ -170,8 +189,8 @@ public class HBaseAtlasHookIT {
} }
public AtlasClient getAtlasClient() { public AtlasClientV2 getAtlasClient() {
AtlasClient ret = null; AtlasClientV2 ret = null;
if (atlasClient != null) { if (atlasClient != null) {
ret = atlasClient; ret = atlasClient;
} }
...@@ -205,7 +224,7 @@ public class HBaseAtlasHookIT { ...@@ -205,7 +224,7 @@ public class HBaseAtlasHookIT {
} }
public interface AssertPredicate { public interface AssertPredicate {
void assertOnEntity(Referenceable entity) throws Exception; void assertOnEntity(AtlasEntity entity) throws Exception;
} }
public interface Predicate { public interface Predicate {
...@@ -224,15 +243,19 @@ public class HBaseAtlasHookIT { ...@@ -224,15 +243,19 @@ public class HBaseAtlasHookIT {
waitFor(80000, new HBaseAtlasHookIT.Predicate() { waitFor(80000, new HBaseAtlasHookIT.Predicate() {
@Override @Override
public void evaluate() throws Exception { public void evaluate() throws Exception {
Referenceable entity = atlasClient.getEntity(typeName, property, value); AtlasEntityWithExtInfo entity = atlasClient.getEntityByAttribute(typeName, Collections.singletonMap(property, value));
assertNotNull(entity); assertNotNull(entity);
if (assertPredicate != null) { if (assertPredicate != null) {
assertPredicate.assertOnEntity(entity); assertPredicate.assertOnEntity(entity.getEntity());
} }
} }
}); });
Referenceable entity = atlasClient.getEntity(typeName, property, value);
return entity.getId()._getId(); AtlasEntityWithExtInfo entity = atlasClient.getEntityByAttribute(typeName, Collections.singletonMap(property, value));
return entity.getEntity().getGuid();
} }
/** /**
......
...@@ -106,6 +106,10 @@ public class AtlasClientV2 extends AtlasBaseClient { ...@@ -106,6 +106,10 @@ public class AtlasClientV2 extends AtlasBaseClient {
super(baseUrl, cookie); super(baseUrl, cookie);
} }
public AtlasClientV2(Configuration configuration, String[] baseUrl, String[] basicAuthUserNamePassword) {
super(configuration, baseUrl, basicAuthUserNamePassword);
}
@VisibleForTesting @VisibleForTesting
AtlasClientV2(WebResource service, Configuration configuration) { AtlasClientV2(WebResource service, Configuration configuration) {
super(service, configuration); super(service, configuration);
......
...@@ -21,12 +21,18 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; ...@@ -21,12 +21,18 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable; import java.io.Serializable;
import java.util.List;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
...@@ -48,7 +54,8 @@ public class HookNotification implements Serializable { ...@@ -48,7 +54,8 @@ public class HookNotification implements Serializable {
* Type of the hook message. * Type of the hook message.
*/ */
public enum HookNotificationType { public enum HookNotificationType {
TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE,
ENTITY_CREATE_V2, ENTITY_PARTIAL_UPDATE_V2, ENTITY_FULL_UPDATE_V2, ENTITY_DELETE_V2
} }
protected HookNotificationType type; protected HookNotificationType type;
...@@ -101,4 +108,118 @@ public class HookNotification implements Serializable { ...@@ -101,4 +108,118 @@ public class HookNotification implements Serializable {
return sb; return sb;
} }
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public static class EntityCreateRequestV2 extends HookNotification implements Serializable {
private AtlasEntitiesWithExtInfo entities;
private EntityCreateRequestV2() {
}
public EntityCreateRequestV2(String user, AtlasEntitiesWithExtInfo entities) {
super(HookNotificationType.ENTITY_CREATE_V2, user);
this.entities = entities;
}
public AtlasEntitiesWithExtInfo getEntities() {
return entities;
}
@Override
public String toString() {
return entities == null ? "null" : entities.toString();
}
}
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public static class EntityUpdateRequestV2 extends HookNotification implements Serializable {
private AtlasEntitiesWithExtInfo entities;
private EntityUpdateRequestV2() {
}
public EntityUpdateRequestV2(String user, AtlasEntitiesWithExtInfo entities) {
super(HookNotificationType.ENTITY_FULL_UPDATE_V2, user);
this.entities = entities;
}
public AtlasEntitiesWithExtInfo getEntities() {
return entities;
}
@Override
public String toString() {
return entities == null ? "null" : entities.toString();
}
}
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public static class EntityPartialUpdateRequestV2 extends HookNotification implements Serializable {
private AtlasObjectId entityId;
private AtlasEntityWithExtInfo entity;
private EntityPartialUpdateRequestV2() {
}
public EntityPartialUpdateRequestV2(String user, AtlasObjectId entityId, AtlasEntityWithExtInfo entity) {
super(HookNotificationType.ENTITY_PARTIAL_UPDATE_V2, user);
this.entityId = entityId;
this.entity = entity;
}
public AtlasObjectId getEntityId() {
return entityId;
}
public AtlasEntityWithExtInfo getEntity() {
return entity;
}
@Override
public String toString() {
return "entityId=" + entityId + "; entity=" + entity;
}
}
@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public static class EntityDeleteRequestV2 extends HookNotification implements Serializable {
private List<AtlasObjectId> entities;
private EntityDeleteRequestV2() {
}
public EntityDeleteRequestV2(String user, List<AtlasObjectId> entities) {
super(HookNotificationType.ENTITY_DELETE_V2, user);
this.entities = entities;
}
public List<AtlasObjectId> getEntities() {
return entities;
}
@Override
public String toString() {
return entities == null ? "null" : entities.toString();
}
}
} }
...@@ -347,6 +347,22 @@ public class AtlasTypeUtil { ...@@ -347,6 +347,22 @@ public class AtlasTypeUtil {
return new AtlasObjectId(header.getGuid(), header.getTypeName()); return new AtlasObjectId(header.getGuid(), header.getTypeName());
} }
public static List<AtlasObjectId> getAtlasObjectIds(List<AtlasEntity> entities) {
final List<AtlasObjectId> ret;
if (CollectionUtils.isNotEmpty(entities)) {
ret = new ArrayList<>(entities.size());
for (AtlasEntity entity : entities) {
ret.add(getAtlasObjectId(entity));
}
} else {
ret = new ArrayList<>();
}
return ret;
}
public static boolean isValidGuid(AtlasObjectId objId) { public static boolean isValidGuid(AtlasObjectId objId) {
return isValidGuid(objId.getGuid()); return isValidGuid(objId.getGuid());
} }
......
...@@ -29,6 +29,10 @@ import org.apache.atlas.model.notification.EntityNotification; ...@@ -29,6 +29,10 @@ import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.model.notification.EntityNotification.EntityNotificationType; import org.apache.atlas.model.notification.EntityNotification.EntityNotificationType;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType; import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.v1.model.instance.AtlasSystemAttributes; import org.apache.atlas.v1.model.instance.AtlasSystemAttributes;
import org.apache.atlas.v1.model.instance.Id; import org.apache.atlas.v1.model.instance.Id;
...@@ -65,12 +69,23 @@ public class AtlasJson { ...@@ -65,12 +69,23 @@ public class AtlasJson {
static { static {
SimpleModule atlasSerDeModule = new SimpleModule("AtlasSerDe", new Version(1, 0, 0, null)); SimpleModule atlasSerDeModule = new SimpleModule("AtlasSerDe", new Version(1, 0, 0, null));
atlasSerDeModule.addSerializer(Date.class, new DateSerializer()); atlasSerDeModule.addSerializer(Referenceable.class, new ReferenceableSerializer());
atlasSerDeModule.addDeserializer(Date.class, new DateDeserializer()); atlasSerDeModule.addDeserializer(Referenceable.class, new ReferenceableDeserializer());
atlasSerDeModule.addSerializer(Struct.class, new StructSerializer());
atlasSerDeModule.addDeserializer(Struct.class, new StructDeserializer());
atlasSerDeModule.addSerializer(Id.class, new IdSerializer());
atlasSerDeModule.addDeserializer(Id.class, new IdDeserializer());
atlasSerDeModule.addDeserializer(HookNotification.class, new HookNotificationDeserializer()); atlasSerDeModule.addDeserializer(HookNotification.class, new HookNotificationDeserializer());
atlasSerDeModule.addDeserializer(EntityNotification.class, new EntityNotificationDeserializer()); atlasSerDeModule.addDeserializer(EntityNotification.class, new EntityNotificationDeserializer());
mapperV1.registerModule(atlasSerDeModule); mapper.registerModule(atlasSerDeModule);
SimpleModule atlasSerDeV1Module = new SimpleModule("AtlasSerDeV1", new Version(1, 0, 0, null));
atlasSerDeV1Module.addSerializer(Date.class, new DateSerializer());
atlasSerDeV1Module.addDeserializer(Date.class, new DateDeserializer());
mapperV1.registerModule(atlasSerDeV1Module);
SimpleModule searchResultV1SerDeModule = new SimpleModule("SearchResultV1SerDe", new Version(1, 0, 0, null)); SimpleModule searchResultV1SerDeModule = new SimpleModule("SearchResultV1SerDe", new Version(1, 0, 0, null));
...@@ -90,7 +105,7 @@ public class AtlasJson { ...@@ -90,7 +105,7 @@ public class AtlasJson {
if (obj instanceof JsonNode && ((JsonNode) obj).isTextual()) { if (obj instanceof JsonNode && ((JsonNode) obj).isTextual()) {
ret = ((JsonNode) obj).textValue(); ret = ((JsonNode) obj).textValue();
} else { } else {
ret = mapperV1.writeValueAsString(obj); ret = mapper.writeValueAsString(obj);
} }
}catch (IOException e){ }catch (IOException e){
LOG.error("AtlasJson.toJson()", e); LOG.error("AtlasJson.toJson()", e);
...@@ -106,6 +121,10 @@ public class AtlasJson { ...@@ -106,6 +121,10 @@ public class AtlasJson {
if (jsonStr != null) { if (jsonStr != null) {
try { try {
ret = mapper.readValue(jsonStr, type); ret = mapper.readValue(jsonStr, type);
if (ret instanceof Struct) {
((Struct) ret).normalize();
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("AtlasType.fromJson()", e); LOG.error("AtlasType.fromJson()", e);
...@@ -116,34 +135,18 @@ public class AtlasJson { ...@@ -116,34 +135,18 @@ public class AtlasJson {
return ret; return ret;
} }
public static String toV1Json(Object obj) { public static <T> T fromJson(String jsonStr, TypeReference<T> type) {
String ret;
try {
if (obj instanceof JsonNode && ((JsonNode) obj).isTextual()) {
ret = ((JsonNode) obj).textValue();
} else {
ret = mapperV1.writeValueAsString(obj);
}
} catch (IOException e) {
LOG.error("AtlasType.toV1Json()", e);
ret = null;
}
return ret;
}
public static <T> T fromV1Json(String jsonStr, Class<T> type) {
T ret = null; T ret = null;
if (jsonStr != null) { if (jsonStr != null) {
try { try {
ret = mapperV1.readValue(jsonStr, type); ret = mapper.readValue(jsonStr, type);
if (ret instanceof Struct) { if (ret instanceof Struct) {
((Struct) ret).normalize(); ((Struct) ret).normalize();
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("AtlasType.fromV1Json()", e); LOG.error("AtlasType.fromJson()", e);
ret = null; ret = null;
} }
...@@ -152,20 +155,16 @@ public class AtlasJson { ...@@ -152,20 +155,16 @@ public class AtlasJson {
return ret; return ret;
} }
public static <T> T fromV1Json(String jsonStr, TypeReference<T> type) { public static String toV1Json(Object obj) {
T ret = null; return toJson(obj);
}
if (jsonStr != null) {
try {
ret = mapperV1.readValue(jsonStr, type);
} catch (IOException e) {
LOG.error("AtlasType.toV1Json()", e);
ret = null; public static <T> T fromV1Json(String jsonStr, Class<T> type) {
} return fromJson(jsonStr, type);
} }
return ret; public static <T> T fromV1Json(String jsonStr, TypeReference<T> type) {
return fromJson(jsonStr, type);
} }
public static String toV1SearchJson(Object obj) { public static String toV1SearchJson(Object obj) {
...@@ -198,7 +197,7 @@ public class AtlasJson { ...@@ -198,7 +197,7 @@ public class AtlasJson {
} }
public static ArrayNode createV1ArrayNode(Collection<?> array) { public static ArrayNode createV1ArrayNode(Collection<?> array) {
ArrayNode ret = mapperV1.createArrayNode(); ArrayNode ret = mapper.createArrayNode();
for (Object elem : array) { for (Object elem : array) {
ret.addPOJO(elem); ret.addPOJO(elem);
...@@ -209,13 +208,13 @@ public class AtlasJson { ...@@ -209,13 +208,13 @@ public class AtlasJson {
public static JsonNode parseToV1JsonNode(String json) throws IOException { public static JsonNode parseToV1JsonNode(String json) throws IOException {
JsonNode jsonNode = mapperV1.readTree(json); JsonNode jsonNode = mapper.readTree(json);
return jsonNode; return jsonNode;
} }
public static ArrayNode parseToV1ArrayNode(String json) throws IOException { public static ArrayNode parseToV1ArrayNode(String json) throws IOException {
JsonNode jsonNode = mapperV1.readTree(json); JsonNode jsonNode = mapper.readTree(json);
if (jsonNode instanceof ArrayNode) { if (jsonNode instanceof ArrayNode) {
return (ArrayNode)jsonNode; return (ArrayNode)jsonNode;
...@@ -228,7 +227,7 @@ public class AtlasJson { ...@@ -228,7 +227,7 @@ public class AtlasJson {
ArrayNode ret = createV1ArrayNode(); ArrayNode ret = createV1ArrayNode();
for (String json : jsonStrings) { for (String json : jsonStrings) {
JsonNode jsonNode = mapperV1.readTree(json); JsonNode jsonNode = mapper.readTree(json);
ret.add(jsonNode); ret.add(jsonNode);
} }
...@@ -263,6 +262,60 @@ public class AtlasJson { ...@@ -263,6 +262,60 @@ public class AtlasJson {
} }
} }
static class ReferenceableSerializer extends JsonSerializer<Referenceable> {
@Override
public void serialize(Referenceable value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
if (value != null) {
mapperV1.writeValue(jgen, value);
}
}
}
static class ReferenceableDeserializer extends JsonDeserializer<Referenceable> {
@Override
public Referenceable deserialize(JsonParser parser, DeserializationContext context) throws IOException {
Referenceable ret = mapperV1.readValue(parser, Referenceable.class);
return ret;
}
}
static class StructSerializer extends JsonSerializer<Struct> {
@Override
public void serialize(Struct value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
if (value != null) {
mapperV1.writeValue(jgen, value);
}
}
}
static class StructDeserializer extends JsonDeserializer<Struct> {
@Override
public Struct deserialize(JsonParser parser, DeserializationContext context) throws IOException {
Struct ret = mapperV1.readValue(parser, Struct.class);
return ret;
}
}
static class IdSerializer extends JsonSerializer<Id> {
@Override
public void serialize(Id value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
if (value != null) {
mapperV1.writeValue(jgen, value);
}
}
}
static class IdDeserializer extends JsonDeserializer<Id> {
@Override
public Id deserialize(JsonParser parser, DeserializationContext context) throws IOException {
Id ret = mapperV1.readValue(parser, Id.class);
return ret;
}
}
static class HookNotificationDeserializer extends JsonDeserializer<HookNotification> { static class HookNotificationDeserializer extends JsonDeserializer<HookNotification> {
@Override @Override
public HookNotification deserialize(JsonParser parser, DeserializationContext context) throws IOException { public HookNotification deserialize(JsonParser parser, DeserializationContext context) throws IOException {
...@@ -295,6 +348,22 @@ public class AtlasJson { ...@@ -295,6 +348,22 @@ public class AtlasJson {
case ENTITY_DELETE: case ENTITY_DELETE:
ret = mapper.treeToValue(root, EntityDeleteRequest.class); ret = mapper.treeToValue(root, EntityDeleteRequest.class);
break; break;
case ENTITY_CREATE_V2:
ret = mapper.treeToValue(root, EntityCreateRequestV2.class);
break;
case ENTITY_PARTIAL_UPDATE_V2:
ret = mapper.treeToValue(root, EntityPartialUpdateRequestV2.class);
break;
case ENTITY_FULL_UPDATE_V2:
ret = mapper.treeToValue(root, EntityUpdateRequestV2.class);
break;
case ENTITY_DELETE_V2:
ret = mapper.treeToValue(root, EntityDeleteRequestV2.class);
break;
} }
} }
......
...@@ -110,7 +110,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message ...@@ -110,7 +110,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class); AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class);
if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage if (msg == null || msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
ret = AtlasType.fromV1Json(messageJson, messageType); ret = AtlasType.fromV1Json(messageJson, messageType);
} else { } else {
String msgJson = messageJson; String msgJson = messageJson;
......
...@@ -17,13 +17,29 @@ ...@@ -17,13 +17,29 @@
*/ */
package org.apache.atlas.notification.hook; package org.apache.atlas.notification.hook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType; import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
...@@ -31,12 +47,19 @@ import static org.testng.Assert.assertTrue; ...@@ -31,12 +47,19 @@ import static org.testng.Assert.assertTrue;
public class HookNotificationTest { public class HookNotificationTest {
private HookMessageDeserializer deserializer = new HookMessageDeserializer(); private HookMessageDeserializer deserializer = new HookMessageDeserializer();
private static final String ATTR_VALUE_STRING = "strValue";
private static final Integer ATTR_VALUE_INTEGER = 10;
private static final Boolean ATTR_VALUE_BOOLEAN = Boolean.TRUE;
private static final Date ATTR_VALUE_DATE = new Date();
@Test @Test
public void testNewMessageSerDe() throws Exception { public void testNewMessageSerDe() throws Exception {
Referenceable entity1 = new Referenceable("sometype"); Referenceable entity1 = new Referenceable("sometype");
Referenceable entity2 = new Referenceable("newtype");
entity1.set("attr", "value"); entity1.set("attr", "value");
entity1.set("complex", new Referenceable("othertype")); entity1.set("complex", new Referenceable("othertype"));
Referenceable entity2 = new Referenceable("newtype");
String user = "user"; String user = "user";
EntityCreateRequest request = new EntityCreateRequest(user, entity1, entity2); EntityCreateRequest request = new EntityCreateRequest(user, entity1, entity2);
...@@ -98,4 +121,174 @@ public class HookNotificationTest { ...@@ -98,4 +121,174 @@ public class HookNotificationTest {
assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE); assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE);
assertEquals(actualNotification.getUser(), HookNotification.UNKNOW_USER); assertEquals(actualNotification.getUser(), HookNotification.UNKNOW_USER);
} }
@Test
public void testEntityCreateV2SerDe() throws Exception {
AtlasEntity entity1 = new AtlasEntity("sometype");
AtlasEntity entity2 = new AtlasEntity("newtype");
AtlasEntity entity3 = new AtlasEntity("othertype");
setAttributes(entity1);
entity1.setAttribute("complex", new AtlasObjectId(entity3.getGuid(), entity3.getTypeName()));
AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo();
entities.addEntity(entity1);
entities.addEntity(entity2);
entities.addReferredEntity(entity3);
String user = "user";
EntityCreateRequestV2 request = new EntityCreateRequestV2(user, entities);
String notificationJson = AtlasJson.toJson(request);
HookNotification actualNotification = deserializer.deserialize(notificationJson);
assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE_V2);
assertEquals(actualNotification.getUser(), user);
EntityCreateRequestV2 createRequest = (EntityCreateRequestV2) actualNotification;
assertEquals(createRequest.getEntities().getEntities().size(), 2);
AtlasEntity actualEntity1 = createRequest.getEntities().getEntities().get(0);
AtlasEntity actualEntity2 = createRequest.getEntities().getEntities().get(1);
AtlasEntity actualEntity3 = createRequest.getEntities().getReferredEntity(entity3.getGuid());
Map actualComplexAttr = (Map)actualEntity1.getAttribute("complex");
assertEquals(actualEntity1.getGuid(), entity1.getGuid());
assertEquals(actualEntity1.getTypeName(), entity1.getTypeName());
assertAttributes(actualEntity1);
assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_GUID), entity3.getGuid());
assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_TYPENAME), entity3.getTypeName());
assertEquals(actualEntity2.getGuid(), entity2.getGuid());
assertEquals(actualEntity2.getTypeName(), entity2.getTypeName());
assertEquals(actualEntity3.getGuid(), entity3.getGuid());
assertEquals(actualEntity3.getTypeName(), entity3.getTypeName());
}
@Test
public void testEntityUpdateV2SerDe() throws Exception {
AtlasEntity entity1 = new AtlasEntity("sometype");
AtlasEntity entity2 = new AtlasEntity("newtype");
AtlasEntity entity3 = new AtlasEntity("othertype");
setAttributes(entity1);
entity1.setAttribute("complex", new AtlasObjectId(entity3.getGuid(), entity3.getTypeName()));
AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo();
entities.addEntity(entity1);
entities.addEntity(entity2);
entities.addReferredEntity(entity3);
String user = "user";
EntityUpdateRequestV2 request = new EntityUpdateRequestV2(user, entities);
String notificationJson = AtlasJson.toJson(request);
HookNotification actualNotification = deserializer.deserialize(notificationJson);
assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_FULL_UPDATE_V2);
assertEquals(actualNotification.getUser(), user);
EntityUpdateRequestV2 updateRequest = (EntityUpdateRequestV2) actualNotification;
assertEquals(updateRequest.getEntities().getEntities().size(), 2);
AtlasEntity actualEntity1 = updateRequest.getEntities().getEntities().get(0);
AtlasEntity actualEntity2 = updateRequest.getEntities().getEntities().get(1);
AtlasEntity actualEntity3 = updateRequest.getEntities().getReferredEntity(entity3.getGuid());
Map actualComplexAttr = (Map)actualEntity1.getAttribute("complex");
assertEquals(actualEntity1.getGuid(), entity1.getGuid());
assertEquals(actualEntity1.getTypeName(), entity1.getTypeName());
assertAttributes(actualEntity1);
assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_GUID), entity3.getGuid());
assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_TYPENAME), entity3.getTypeName());
assertEquals(actualEntity2.getGuid(), entity2.getGuid());
assertEquals(actualEntity2.getTypeName(), entity2.getTypeName());
assertEquals(actualEntity3.getGuid(), entity3.getGuid());
assertEquals(actualEntity3.getTypeName(), entity3.getTypeName());
}
@Test
public void testEntityPartialUpdateV2SerDe() throws Exception {
AtlasEntity entity1 = new AtlasEntity("sometype");
AtlasEntity entity2 = new AtlasEntity("newtype");
AtlasEntity entity3 = new AtlasEntity("othertype");
setAttributes(entity1);
entity1.setAttribute("complex", new AtlasObjectId(entity3.getGuid(), entity3.getTypeName()));
AtlasEntityWithExtInfo entity = new AtlasEntityWithExtInfo(entity1);
entity.addReferredEntity(entity2);
entity.addReferredEntity(entity3);
String user = "user";
EntityPartialUpdateRequestV2 request = new EntityPartialUpdateRequestV2(user, AtlasTypeUtil.getAtlasObjectId(entity1), entity);
String notificationJson = AtlasJson.toJson(request);
HookNotification actualNotification = deserializer.deserialize(notificationJson);
assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_PARTIAL_UPDATE_V2);
assertEquals(actualNotification.getUser(), user);
EntityPartialUpdateRequestV2 updateRequest = (EntityPartialUpdateRequestV2) actualNotification;
assertEquals(updateRequest.getEntity().getReferredEntities().size(), 2);
AtlasEntity actualEntity1 = updateRequest.getEntity().getEntity();
AtlasEntity actualEntity2 = updateRequest.getEntity().getReferredEntity(entity2.getGuid());
AtlasEntity actualEntity3 = updateRequest.getEntity().getReferredEntity(entity3.getGuid());
Map actualComplexAttr = (Map)actualEntity1.getAttribute("complex");
assertEquals(actualEntity1.getGuid(), entity1.getGuid());
assertEquals(actualEntity1.getTypeName(), entity1.getTypeName());
assertAttributes(actualEntity1);
assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_GUID), entity3.getGuid());
assertEquals(actualComplexAttr.get(AtlasObjectId.KEY_TYPENAME), entity3.getTypeName());
assertEquals(actualEntity2.getGuid(), entity2.getGuid());
assertEquals(actualEntity2.getTypeName(), entity2.getTypeName());
assertEquals(actualEntity3.getGuid(), entity3.getGuid());
assertEquals(actualEntity3.getTypeName(), entity3.getTypeName());
}
@Test
public void testEntityDeleteV2SerDe() throws Exception {
AtlasEntity entity1 = new AtlasEntity("sometype");
AtlasEntity entity2 = new AtlasEntity("newtype");
AtlasEntity entity3 = new AtlasEntity("othertype");
List<AtlasObjectId> objectsToDelete = new ArrayList<>();
objectsToDelete.add(new AtlasObjectId(entity1.getGuid(), entity1.getTypeName()));
objectsToDelete.add(new AtlasObjectId(entity2.getGuid(), entity2.getTypeName()));
objectsToDelete.add(new AtlasObjectId(entity3.getGuid(), entity3.getTypeName()));
String user = "user";
EntityDeleteRequestV2 request = new EntityDeleteRequestV2(user, objectsToDelete);
String notificationJson = AtlasJson.toJson(request);
HookNotification actualNotification = deserializer.deserialize(notificationJson);
assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_DELETE_V2);
assertEquals(actualNotification.getUser(), user);
EntityDeleteRequestV2 deleteRequest = (EntityDeleteRequestV2) actualNotification;
assertEquals(deleteRequest.getEntities().size(), objectsToDelete.size());
assertEquals(deleteRequest.getEntities(), objectsToDelete);
}
private void setAttributes(AtlasEntity entity) {
entity.setAttribute("attrStr", ATTR_VALUE_STRING);
entity.setAttribute("attrInt", ATTR_VALUE_INTEGER);
entity.setAttribute("attrBool", ATTR_VALUE_BOOLEAN);
entity.setAttribute("attrDate", ATTR_VALUE_DATE);
}
private void assertAttributes(AtlasEntity entity) {
assertEquals(entity.getAttribute("attrStr"), ATTR_VALUE_STRING);
assertEquals(entity.getAttribute("attrInt"), ATTR_VALUE_INTEGER);
assertEquals(entity.getAttribute("attrBool"), ATTR_VALUE_BOOLEAN);
assertEquals(entity.getAttribute("attrDate"), ATTR_VALUE_DATE.getTime());
}
} }
...@@ -21,6 +21,7 @@ import org.apache.atlas.exception.AtlasBaseException; ...@@ -21,6 +21,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v1.EntityStream; import org.apache.atlas.repository.store.graph.v1.EntityStream;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
...@@ -85,6 +86,16 @@ public interface AtlasEntityStore { ...@@ -85,6 +86,16 @@ public interface AtlasEntityStore {
/** /**
* Update a single entity * Update a single entity
* @param objectId ID of the entity
* @param updatedEntityInfo updated entity information
* @return EntityMutationResponse details of the updates performed by this call
* @throws AtlasBaseException
*
*/
EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException;
/**
* Update a single entity
* @param entityType type of the entity * @param entityType type of the entity
* @param uniqAttributes Attributes that uniquely identify the entity * @param uniqAttributes Attributes that uniquely identify the entity
* @return EntityMutationResponse details of the updates performed by this call * @return EntityMutationResponse details of the updates performed by this call
......
...@@ -226,6 +226,38 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { ...@@ -226,6 +226,38 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
@Override @Override
@GraphTransaction @GraphTransaction
public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> updateEntity({}, {}, {})", objectId, updatedEntityInfo, isPartialUpdate);
}
if (objectId == null || updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "null entity-id/entity");
}
final String guid;
if (AtlasTypeUtil.isAssignedGuid(objectId.getGuid())) {
guid = objectId.getGuid();
} else {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName());
if (entityType == null) {
throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, objectId.getTypeName());
}
guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(objectId.getTypeName()), objectId.getUniqueAttributes());
}
AtlasEntity entity = updatedEntityInfo.getEntity();
entity.setGuid(guid);
return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), isPartialUpdate, false);
}
@Override
@GraphTransaction
public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes, public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes,
AtlasEntityWithExtInfo updatedEntityInfo) throws AtlasBaseException { AtlasEntityWithExtInfo updatedEntityInfo) throws AtlasBaseException {
......
...@@ -23,6 +23,7 @@ import kafka.utils.ShutdownableThread; ...@@ -23,6 +23,7 @@ import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasBaseClient; import org.apache.atlas.AtlasBaseClient;
import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException; import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContextV1; import org.apache.atlas.RequestContextV1;
...@@ -30,7 +31,13 @@ import org.apache.atlas.ha.HAConfiguration; ...@@ -30,7 +31,13 @@ import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
...@@ -353,8 +360,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -353,8 +360,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try { try {
// Used for intermediate conversions during create and update // Used for intermediate conversions during create and update
AtlasEntitiesWithExtInfo entities = null;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) { for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries); LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
...@@ -366,8 +371,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -366,8 +371,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
requestContext.setUser(messageUser); requestContext.setUser(messageUser);
switch (message.getType()) { switch (message.getType()) {
case ENTITY_CREATE: case ENTITY_CREATE: {
final EntityCreateRequest createRequest = (EntityCreateRequest) message; final EntityCreateRequest createRequest = (EntityCreateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
if (numRetries == 0) { // audit only on the first attempt if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = AtlasClient.API_V1.CREATE_ENTITY; AtlasBaseClient.API api = AtlasClient.API_V1.CREATE_ENTITY;
...@@ -375,24 +381,20 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -375,24 +381,20 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
audit(messageUser, api.getMethod(), api.getNormalizedPath()); audit(messageUser, api.getMethod(), api.getNormalizedPath());
} }
entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break; }
break;
case ENTITY_PARTIAL_UPDATE: case ENTITY_PARTIAL_UPDATE: {
final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message; final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
final Referenceable referenceable = partialUpdateRequest.getEntity();
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntity(referenceable);
if (numRetries == 0) { // audit only on the first attempt if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = UPDATE_ENTITY_BY_ATTRIBUTE; AtlasBaseClient.API api = UPDATE_ENTITY_BY_ATTRIBUTE;
audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName())); audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName()));
} }
Referenceable referenceable = partialUpdateRequest.getEntity();
entities = instanceConverter.toAtlasEntity(referenceable);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName()); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue())); String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue()));
...@@ -400,14 +402,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -400,14 +402,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
entities.getEntities().get(0).setGuid(guid); entities.getEntities().get(0).setGuid(guid);
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
break; }
break;
case ENTITY_DELETE: case ENTITY_DELETE: {
final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message; final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
if (numRetries == 0) { // audit only on the first attempt if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = DELETE_ENTITY_BY_ATTRIBUTE; AtlasBaseClient.API api = DELETE_ENTITY_BY_ATTRIBUTE;
audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), deleteRequest.getTypeName())); audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), deleteRequest.getTypeName()));
} }
...@@ -416,12 +418,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -416,12 +418,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue())); atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
} catch (ClassCastException cle) { } catch (ClassCastException cle) {
LOG.error("Failed to do a partial update on Entity"); LOG.error("Failed to delete entity {}", deleteRequest);
} }
break; }
break;
case ENTITY_FULL_UPDATE: case ENTITY_FULL_UPDATE: {
final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
final AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
if (numRetries == 0) { // audit only on the first attempt if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = UPDATE_ENTITY; AtlasBaseClient.API api = UPDATE_ENTITY;
...@@ -429,10 +433,70 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -429,10 +433,70 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
audit(messageUser, api.getMethod(), api.getNormalizedPath()); audit(messageUser, api.getMethod(), api.getNormalizedPath());
} }
entities = instanceConverter.toAtlasEntities(updateRequest.getEntities()); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
}
break;
case ENTITY_CREATE_V2: {
final EntityCreateRequestV2 createRequestV2 = (EntityCreateRequestV2) message;
final AtlasEntitiesWithExtInfo entities = createRequestV2.getEntities();
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = AtlasClientV2.API_V2.CREATE_ENTITY;
audit(messageUser, api.getMethod(), api.getNormalizedPath());
}
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
}
break;
case ENTITY_PARTIAL_UPDATE_V2: {
final EntityPartialUpdateRequestV2 partialUpdateRequest = (EntityPartialUpdateRequestV2) message;
final AtlasObjectId entityId = partialUpdateRequest.getEntityId();
final AtlasEntityWithExtInfo entity = partialUpdateRequest.getEntity();
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = AtlasClientV2.API_V2.UPDATE_ENTITY;
audit(messageUser, api.getMethod(), api.getNormalizedPath());
}
atlasEntityStore.updateEntity(entityId, entity, true);
}
break;
case ENTITY_FULL_UPDATE_V2: {
final EntityUpdateRequestV2 updateRequest = (EntityUpdateRequestV2) message;
final AtlasEntitiesWithExtInfo entities = updateRequest.getEntities();
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = AtlasClientV2.API_V2.UPDATE_ENTITY;
audit(messageUser, api.getMethod(), api.getNormalizedPath());
}
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break; }
break;
case ENTITY_DELETE_V2: {
final EntityDeleteRequestV2 deleteRequest = (EntityDeleteRequestV2) message;
final List<AtlasObjectId> entities = deleteRequest.getEntities();
try {
for (AtlasObjectId entity : entities) {
if (numRetries == 0) { // audit only on the first attempt
AtlasBaseClient.API api = AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE;
audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), entity.getTypeName()));
}
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
}
} catch (ClassCastException cle) {
LOG.error("Failed to do delete entities {}", entities);
}
}
break;
default: default:
throw new IllegalStateException("Unknown notification type: " + message.getType().name()); throw new IllegalStateException("Unknown notification type: " + message.getType().name());
...@@ -541,4 +605,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ...@@ -541,4 +605,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date())); AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment