Commit 15534f23 by Ashutosh Mestry

ATLAS-2470 - JanusGraph Cassandra . Updates to unit test.

parent bf5f8ef0
...@@ -18,13 +18,15 @@ ...@@ -18,13 +18,15 @@
package org.apache.atlas.repository.audit; package org.apache.atlas.repository.audit;
import org.apache.atlas.ApplicationProperties; import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.configuration.MapConfiguration;
import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportException;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import java.io.IOException; import java.io.IOException;
...@@ -32,22 +34,53 @@ import java.util.HashMap; ...@@ -32,22 +34,53 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class CassandraAuditRepositoryTest extends AuditRepositoryTestBase { public class CassandraAuditRepositoryTest extends AuditRepositoryTestBase {
private static final int MAX_RETRIES = 9;
private final String CLUSTER_HOST = "localhost";
private final String CLUSTER_NAME_TEST = "Test Cluster";
private final int CLUSTER_PORT = 9042;
@BeforeClass @BeforeClass
public void setup() throws InterruptedException, TTransportException, ConfigurationException, IOException, public void setup() throws InterruptedException, TTransportException, ConfigurationException, IOException,
AtlasException { AtlasException {
EmbeddedCassandraServerHelper.startEmbeddedCassandra("cassandra_test.yml"); EmbeddedCassandraServerHelper.startEmbeddedCassandra("cassandra_test.yml");
eventRepository = new CassandraBasedAuditRepository(); eventRepository = new CassandraBasedAuditRepository();
Map<String, Object> props = new HashMap<>(); Configuration atlasConf = new MapConfiguration(getClusterProperties());
props.put(CassandraBasedAuditRepository.MANAGE_EMBEDDED_CASSANDRA, Boolean.TRUE); ((CassandraBasedAuditRepository) eventRepository).setApplicationProperties(atlasConf);
props.put(CassandraBasedAuditRepository.CASSANDRA_CLUSTERNAME_PROPERTY, "Test Cluster"); ((CassandraBasedAuditRepository) eventRepository).start();
props.put(CassandraBasedAuditRepository.CASSANDRA_HOSTNAME_PROPERTY, "localhost");
props.put(CassandraBasedAuditRepository.CASSANDRA_PORT_PROPERTY, 9042);
Configuration atlasConf = new MapConfiguration(props);
((CassandraBasedAuditRepository)eventRepository).setApplicationProperties(atlasConf);
((CassandraBasedAuditRepository)eventRepository).start();
// Pause for a second to ensure that the embedded cluster has started
Thread.sleep(1000);
}
ensureClusterCreation();
}
private Map<String, Object> getClusterProperties() {
Map<String, Object> props = new HashMap<>();
props.put(CassandraBasedAuditRepository.MANAGE_EMBEDDED_CASSANDRA, Boolean.TRUE);
props.put(CassandraBasedAuditRepository.CASSANDRA_CLUSTERNAME_PROPERTY, CLUSTER_NAME_TEST);
props.put(CassandraBasedAuditRepository.CASSANDRA_HOSTNAME_PROPERTY, CLUSTER_HOST);
props.put(CassandraBasedAuditRepository.CASSANDRA_PORT_PROPERTY, CLUSTER_PORT);
return props;
}
private void ensureClusterCreation() throws InterruptedException {
// Retry the connection until we either connect or timeout
Cluster.Builder cassandraClusterBuilder = Cluster.builder();
Cluster cluster =
cassandraClusterBuilder.addContactPoint(CLUSTER_HOST).withClusterName(CLUSTER_NAME_TEST).withPort(CLUSTER_PORT)
.build();
int retryCount = 0;
while (retryCount < MAX_RETRIES) {
try {
Session cassSession = cluster.connect();
if (cassSession.getState().getConnectedHosts().size() > 0) {
cassSession.close();
return;
}
} catch (Exception e) {
Thread.sleep(1000);
}
retryCount++;
}
throw new SkipException("Unable to connect to embedded Cassandra after " + MAX_RETRIES + " seconds.");
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment