Commit c37656cc by Sarath Subramanian

ATLAS-3157: Add Integration tests for Hive metastore hook

parent e050bc06
......@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.bridge.ColumnLineageUtils;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.hook.HiveHookIT;
......@@ -67,7 +68,10 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
import static org.apache.atlas.hive.bridge.HiveMetaStoreBridge.HDFS_PATH;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.ATTRIBUTE_QUALIFIED_NAME;
import static org.apache.atlas.hive.model.HiveDataTypes.HIVE_DB;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
......@@ -103,6 +107,11 @@ public class HiveITBase {
//Set-up hive session
conf = new HiveConf();
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
conf.set("hive.metastore.event.listeners", "");
// 'driver' using this configuration will be used for tests in HiveHookIT
// HiveHookIT will use this driver to test post-execution hooks in HiveServer2.
// initialize 'driver' with HMS hook disabled.
driver = new Driver(conf);
ss = new SessionState(conf);
ss = SessionState.start(ss);
......@@ -112,6 +121,7 @@ public class HiveITBase {
Configuration configuration = ApplicationProperties.get();
String[] atlasEndPoint = configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT);
if (atlasEndPoint == null || atlasEndPoint.length == 0) {
atlasEndPoint = new String[] { DGI_URL };
}
......@@ -122,16 +132,21 @@ public class HiveITBase {
} else {
atlasClientV2 = new AtlasClientV2(atlasEndPoint);
atlasClient = new AtlasClient(atlasEndPoint);
}
hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClientV2);
HiveConf conf = new HiveConf();
conf.set("hive.exec.post.hooks", "");
SessionState ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
// 'driverWithoutContext' using this configuration will be used for tests in HiveMetastoreHookIT
// HiveMetastoreHookIT will use this driver to test event listeners in HiveMetastore.
// initialize 'driverWithoutContext' with HiveServer2 post execution hook disabled.
driverWithoutContext = new Driver(conf);
}
......@@ -149,8 +164,11 @@ public class HiveITBase {
protected void runCommandWithDelay(Driver driver, String cmd, int sleepMs) throws Exception {
LOG.debug("Running command '{}'", cmd);
CommandProcessorResponse response = driver.run(cmd);
assertEquals(response.getResponseCode(), 0);
if (sleepMs != 0) {
Thread.sleep(sleepMs);
}
......@@ -182,11 +200,15 @@ public class HiveITBase {
}
protected String random() {
return RandomStringUtils.randomAlphanumeric(10);
return RandomStringUtils.randomAlphanumeric(10).toLowerCase();
}
protected String tableName() {
return "table" + random();
return "table_" + random();
}
protected String dbName() {
return "db_" + random();
}
protected String assertTableIsRegistered(String dbName, String tableName) throws Exception {
......@@ -336,12 +358,35 @@ public class HiveITBase {
}
protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for database: {}", dbName);
String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
return assertEntityIsRegistered(HIVE_DB.getName(), REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName, assertPredicate);
}
public void assertDatabaseIsNotRegistered(String dbName) throws Exception {
LOG.debug("Searching for database {}", dbName);
String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), REFERENCEABLE_ATTRIBUTE_NAME,
dbQualifiedName, assertPredicate);
assertEntityIsNotRegistered(HIVE_DB.getName(), ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName);
}
protected void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception {
// wait for sufficient time before checking if entity is not available.
long waitTime = 2000;
LOG.debug("Waiting for {} msecs, before asserting entity is not registered.", waitTime);
Thread.sleep(waitTime);
try {
atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property, value));
fail(String.format("Entity was not supposed to exist for typeName = %s, attributeName = %s, attributeValue = %s", typeName, property, value));
} catch (AtlasServiceException e) {
if (e.getStatus() == NOT_FOUND) {
return;
}
}
}
protected AtlasEntity getAtlasEntityByType(String type, String id) throws Exception {
AtlasEntity atlasEntity = null;
......
......@@ -28,12 +28,8 @@ import org.apache.atlas.hive.HiveITBase;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.hook.events.BaseHiveEvent;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
......@@ -41,18 +37,22 @@ import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.text.ParseException;
......@@ -60,17 +60,32 @@ import java.util.*;
import static org.apache.atlas.AtlasClient.NAME;
import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.Assert.*;
public class HiveHookIT extends HiveITBase {
private static final Logger LOG = LoggerFactory.getLogger(HiveHookIT.class);
private static final String PART_FILE = "2015-01-01";
private Driver driverWithNoHook;
@BeforeClass
public void setUp() throws Exception {
// initialize 'driverWithNoHook' with HiveServer2 hook and HiveMetastore hook disabled
HiveConf conf = new HiveConf();
conf.set("hive.exec.post.hooks", "");
conf.set("hive.metastore.event.listeners", "");
SessionState ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
// Initialize 'driverWithNoHook' with HS2 hook disabled and HMS hook disabled.
driverWithNoHook = new Driver(conf);
super.setUp();
}
@Test
public void testCreateDatabase() throws Exception {
String dbName = "db" + random();
......@@ -87,7 +102,7 @@ public class HiveHookIT extends HiveITBase {
//There should be just one entity per dbname
runCommand("drop database " + dbName);
assertDBIsNotRegistered(dbName);
assertDatabaseIsNotRegistered(dbName);
runCommand("create database " + dbName);
dbId = assertDatabaseIsRegistered(dbName);
......@@ -1557,7 +1572,7 @@ public class HiveHookIT extends HiveITBase {
String tableName = tableName();
String createCommand = "create table " + tableName + " (id int, name string)";
driverWithoutContext.run(createCommand);
driverWithNoHook.run(createCommand);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
......@@ -1802,7 +1817,7 @@ public class HiveHookIT extends HiveITBase {
assertTableIsNotRegistered(dbName, tableNames[i]);
}
assertDBIsNotRegistered(dbName);
assertDatabaseIsNotRegistered(dbName);
}
@Test
......@@ -1849,14 +1864,14 @@ public class HiveHookIT extends HiveITBase {
//Test Deletion of a non existing DB
String dbName = "nonexistingdb";
assertDBIsNotRegistered(dbName);
assertDatabaseIsNotRegistered(dbName);
String query = String.format("drop database if exists %s cascade", dbName);
runCommand(query);
//Should have no effect
assertDBIsNotRegistered(dbName);
assertDatabaseIsNotRegistered(dbName);
}
@Test
......@@ -2209,33 +2224,10 @@ public class HiveHookIT extends HiveITBase {
assertEntityIsNotRegistered(HiveDataTypes.HIVE_TABLE.getName(), ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
}
private void assertDBIsNotRegistered(String dbName) throws Exception {
LOG.debug("Searching for database {}", dbName);
String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_DB.getName(), ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName);
}
private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate) throws Exception {
return assertTableIsRegistered(dbName, tableName, assertPredicate, false);
}
private void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception {
waitFor(80000, new Predicate() {
@Override
public void evaluate() throws Exception {
try {
atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property, value));
} catch (AtlasServiceException e) {
if (e.getStatus() == ClientResponse.Status.NOT_FOUND) {
return;
}
}
fail(String.format("Entity was not supposed to exist for typeName = %s, attributeName = %s, "
+ "attributeValue = %s", typeName, property, value));
}
});
}
@Test
public void testLineage() throws Exception {
String table1 = createTable(false);
......@@ -2267,10 +2259,6 @@ public class HiveHookIT extends HiveITBase {
runCommand("show transactions");
}
private String dbName() {
return "db" + random();
}
private String createDatabase() throws Exception {
String dbName = dbName();
......
......@@ -37,6 +37,11 @@
</property>
<property>
<name>hive.metastore.event.listeners</name>
<value>org.apache.atlas.hive.hook.HiveMetastoreHookImpl</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>false</value>
</property>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment