Commit effb7537 by Sarath Subramanian

ATLAS-2521: Remove Titan 0.5.4 support from Atlas

parent 8b65aed0
...@@ -52,10 +52,6 @@ ...@@ -52,10 +52,6 @@
<artifactId>*</artifactId> <artifactId>*</artifactId>
</exclusion> </exclusion>
<exclusion> <exclusion>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId> <artifactId>servlet-api</artifactId>
</exclusion> </exclusion>
......
...@@ -67,10 +67,6 @@ ...@@ -67,10 +67,6 @@
<artifactId>*</artifactId> <artifactId>*</artifactId>
</exclusion> </exclusion>
<exclusion> <exclusion>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId> <artifactId>servlet-api</artifactId>
</exclusion> </exclusion>
......
...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley ...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
......
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
<logger name="com.thinkaurelius.titan" additivity="false"> <logger name="org.janusgraph" additivity="false">
<level value="warn"/> <level value="warn"/>
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
......
...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley ...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
......
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
<logger name="com.thinkaurelius.titan" additivity="false"> <logger name="org.janusgraph" additivity="false">
<level value="warn"/> <level value="warn"/>
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
......
...@@ -43,11 +43,6 @@ ...@@ -43,11 +43,6 @@
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
<logger name="com.thinkaurelius.titan" additivity="false">
<level value="warn"/>
<appender-ref ref="FILE"/>
</logger>
<!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config --> <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config -->
<logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false"> <logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false">
<level value="error"/> <level value="error"/>
......
...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley ...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
......
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
<logger name="com.thinkaurelius.titan" additivity="false"> <logger name="org.janusgraph" additivity="false">
<level value="warn"/> <level value="warn"/>
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
......
...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley ...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
......
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
<logger name="com.thinkaurelius.titan" additivity="false"> <logger name="org.janusgraph" additivity="false">
<level value="warn"/> <level value="warn"/>
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
......
...@@ -41,7 +41,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley ...@@ -41,7 +41,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
......
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
<logger name="com.thinkaurelius.titan" additivity="false"> <logger name="org.janusgraph" additivity="false">
<level value="warn"/> <level value="warn"/>
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
......
...@@ -57,7 +57,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley ...@@ -57,7 +57,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
......
...@@ -23,7 +23,4 @@ ...@@ -23,7 +23,4 @@
<suppressions> <suppressions>
<suppress checks="JavadocType" files="[/\\]src[/\\]test[/\\]java[/\\]"/> <suppress checks="JavadocType" files="[/\\]src[/\\]test[/\\]java[/\\]"/>
<!-- skip checks on customized titan 0.5.4 files -->
<suppress checks="[a-zA-Z0-9]*" files="[/\\]com[/\\]thinkaurelius[/\\]titan[/\\]"/>
</suppressions> </suppressions>
...@@ -47,7 +47,6 @@ public final class Constants { ...@@ -47,7 +47,6 @@ public final class Constants {
/** /**
* Full-text for the entity for enabling full-text search. * Full-text for the entity for enabling full-text search.
*/ */
//weird issue in TitanDB if __ added to this property key. Not adding it for now
public static final String ENTITY_TEXT_PROPERTY_KEY = "entityText"; public static final String ENTITY_TEXT_PROPERTY_KEY = "entityText";
/** /**
......
...@@ -48,9 +48,9 @@ ...@@ -48,9 +48,9 @@
</logger> </logger>
--> -->
<logger name="com.thinkaurelius.titan" additivity="false"> <logger name="org.janusgraph" additivity="false">
<level value="warn"/> <level value="warn"/>
<appender-ref ref="console"/> <appender-ref ref="FILE"/>
</logger> </logger>
<logger name="org.springframework" additivity="false"> <logger name="org.springframework" additivity="false">
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
<graph.storage.backend>hbase</graph.storage.backend> <graph.storage.backend>hbase</graph.storage.backend>
<graph.storage.properties>#Hbase <graph.storage.properties>#Hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname= atlas.graph.storage.hostname=
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000 atlas.graph.storage.lock.wait-time=10000
...@@ -173,7 +173,7 @@ atlas.graph.index.search.elasticsearch.create.sleep=2000 ...@@ -173,7 +173,7 @@ atlas.graph.index.search.elasticsearch.create.sleep=2000
<properties> <properties>
<graph.storage.properties>#Hbase <graph.storage.properties>#Hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=localhost atlas.graph.storage.hostname=localhost
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000 atlas.graph.storage.lock.wait-time=10000
......
...@@ -49,7 +49,7 @@ ...@@ -49,7 +49,7 @@
# Where pid files are stored. Defatult is logs directory under the base install location # Where pid files are stored. Defatult is logs directory under the base install location
#export ATLAS_PID_DIR= #export ATLAS_PID_DIR=
# where the atlas titan db data is stored. Defatult is logs/data directory under the base install location # where the atlas janusgraph db data is stored. Defatult is logs/data directory under the base install location
#export ATLAS_DATA_DIR= #export ATLAS_DATA_DIR=
# Where do you want to expand the war file. By Default it is in /server/webapp dir under the base install dir. # Where do you want to expand the war file. By Default it is in /server/webapp dir under the base install dir.
......
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
<logger name="com.thinkaurelius.titan" additivity="false"> <logger name="org.janusgraph" additivity="false">
<level value="warn"/> <level value="warn"/>
<appender-ref ref="FILE"/> <appender-ref ref="FILE"/>
</logger> </logger>
......
...@@ -519,7 +519,6 @@ ...@@ -519,7 +519,6 @@
--> -->
<fieldType name="currency" class="solr.CurrencyField" precisionStep="8" defaultCurrency="USD" currencyConfig="currency.xml" /> <fieldType name="currency" class="solr.CurrencyField" precisionStep="8" defaultCurrency="USD" currencyConfig="currency.xml" />
<!--Titan specific-->
<fieldType name="uuid" <fieldType name="uuid"
class="solr.UUIDField" class="solr.UUIDField"
indexed="true" /> indexed="true" />
......
...@@ -606,7 +606,6 @@ ...@@ -606,7 +606,6 @@
</admin> </admin>
<!--Titan specific-->
<updateRequestProcessorChain default="true"> <updateRequestProcessorChain default="true">
<processor class="solr.TimestampUpdateProcessorFactory"> <processor class="solr.TimestampUpdateProcessorFactory">
<str name="fieldName">timestamp</str> <str name="fieldName">timestamp</str>
......
...@@ -38,8 +38,7 @@ import java.util.Set; ...@@ -38,8 +38,7 @@ import java.util.Set;
/** /**
* *
* Abstract implementation of AtlasGraphQuery that is used by Titan 0.5.4, * Abstract implementation of AtlasGraphQuery that is used by JanusGraph
* Titan 1.0.0 and JanusGraph
* <p> * <p>
* Represents a graph query as an OrConditions which consists of * Represents a graph query as an OrConditions which consists of
* 1 or more AndConditions. The query is executed by converting * 1 or more AndConditions. The query is executed by converting
......
...@@ -35,8 +35,7 @@ mvn install [-P dist] -DGRAPH-PROVIDER=janus ...@@ -35,8 +35,7 @@ mvn install [-P dist] -DGRAPH-PROVIDER=janus
Some tests in the repository and webapp projects are skipped when running with the janus provider, due to hard Some tests in the repository and webapp projects are skipped when running with the janus provider, due to hard
dependencies on Gremlin2. These components need to be updated. Please refer to "known issues" section below. dependencies on Gremlin2. These components need to be updated. Please refer to "known issues" section below.
This will build Atlas and run all of the tests against Janus. Such a build MUST be used with JanusGraph and This will build Atlas and run all of the tests against Janus.
CANNOT be used with any other graph provider, e.g. Titan 0.5.4 or Titan 1.0.0.
2) Configure the Atlas runtime to use JanusGraph by setting the atlas.graphdb.backend property in 2) Configure the Atlas runtime to use JanusGraph by setting the atlas.graphdb.backend property in
ATLAS_HOME/conf/atlas-application.properties, as follows: ATLAS_HOME/conf/atlas-application.properties, as follows:
......
...@@ -50,7 +50,7 @@ import java.util.ArrayList; ...@@ -50,7 +50,7 @@ import java.util.ArrayList;
import java.util.Map; import java.util.Map;
/** /**
* Default implementation for Graph Provider that doles out Titan Graph. * Default implementation for Graph Provider that doles out JanusGraph.
*/ */
public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> { public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class); private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class);
......
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
<module>api</module> <module>api</module>
<module>common</module> <module>common</module>
<module>graphdb-impls</module> <module>graphdb-impls</module>
<module>titan0</module>
<module>janus</module> <module>janus</module>
</modules> </modules>
......
...@@ -9,17 +9,12 @@ If GRAPH-PROVIDER is not set, the default graph backend is adopted. This is curr ...@@ -9,17 +9,12 @@ If GRAPH-PROVIDER is not set, the default graph backend is adopted. This is curr
In order to build with a specific (non-default) graph backend set the GRAPH-PROVDER system variable. In order to build with a specific (non-default) graph backend set the GRAPH-PROVDER system variable.
If GRAPH-PROVIDER is set to titan0, the build will contain Titan 0.5.4
If GRAPH-PROVIDER is set to janus, the build will contain JanusGraph 0.2.0 (i.e. the default above) If GRAPH-PROVIDER is set to janus, the build will contain JanusGraph 0.2.0 (i.e. the default above)
For example, to build Atlas with the janus graph-provider: For example, to build Atlas with the janus graph-provider:
mvn install [-P dist] -DGRAPH-PROVIDER=janus mvn install [-P dist] -DGRAPH-PROVIDER=janus
JanusGraph support Gremlin3 only (and NOT Gremlin2).
Titan 0.5.4 supports Gremlin2 only, whereas JanusGraph support Gremlin3 only (and NOT Gremlin2).
Gremlin2 and Gremlin3 are not compatible. The gremlin used by Atlas is translated into either Gremlin2 or
Gremlin3 depending on which graph backend is used in the build. This is implemented in GremlinExpressionFactory.
REQUIREMENTS REQUIREMENTS
------------ ------------
......
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
/**
* This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course
* of development from 0.94 to 1.0 and beyond.
*/
public interface AdminMask extends Closeable
{
void clearTable(String tableName, long timestamp) throws IOException;
HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException;
boolean tableExists(String tableName) throws IOException;
void createTable(HTableDescriptor desc) throws IOException;
void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
/**
* Estimate the number of regionservers in the HBase cluster.
*
* This is usually implemented by calling
* {@link HBaseAdmin#getClusterStatus()} and then
* {@link ClusterStatus#getServers()} and finally {@code size()} on the
* returned server list.
*
* @return the number of servers in the cluster or -1 if it could not be determined
*/
int getEstimatedRegionServerCount();
void disableTable(String tableName) throws IOException;
void enableTable(String tableName) throws IOException;
boolean isTableDisabled(String tableName) throws IOException;
void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException;
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.Closeable;
import java.io.IOException;
/**
* This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course
* of development from 0.94 to 1.0 and beyond.
*/
public interface ConnectionMask extends Closeable
{
TableMask getTable(String name) throws IOException;
AdminMask getAdmin() throws IOException;
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.thinkaurelius.titan.util.system.IOUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
public class HBaseAdmin0_98 implements AdminMask
{
private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class);
private final HBaseAdmin adm;
public HBaseAdmin0_98(HBaseAdmin adm)
{
this.adm = adm;
}
@Override
public void clearTable(String tableName, long timestamp) throws IOException
{
if (!adm.tableExists(tableName)) {
log.debug("clearStorage() called before table {} was created, skipping.", tableName);
return;
}
// Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than
// disabling and deleting tables.
HTable table = null;
try {
table = new HTable(adm.getConfiguration(), tableName);
Scan scan = new Scan();
scan.setBatch(100);
scan.setCacheBlocks(false);
scan.setCaching(2000);
scan.setTimeRange(0, Long.MAX_VALUE);
scan.setMaxVersions(1);
ResultScanner scanner = null;
try {
scanner = table.getScanner(scan);
for (Result res : scanner) {
Delete d = new Delete(res.getRow());
d.setTimestamp(timestamp);
table.delete(d);
}
} finally {
IOUtils.closeQuietly(scanner);
}
} finally {
IOUtils.closeQuietly(table);
}
}
@Override
public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException
{
return adm.getTableDescriptor(tableName.getBytes());
}
@Override
public boolean tableExists(String tableName) throws IOException
{
return adm.tableExists(tableName);
}
@Override
public void createTable(HTableDescriptor desc) throws IOException
{
adm.createTable(desc);
}
@Override
public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
{
adm.createTable(desc, startKey, endKey, numRegions);
}
@Override
public int getEstimatedRegionServerCount()
{
int serverCount = -1;
try {
serverCount = adm.getClusterStatus().getServers().size();
log.debug("Read {} servers from HBase ClusterStatus", serverCount);
} catch (IOException e) {
log.debug("Unable to retrieve HBase cluster status", e);
}
return serverCount;
}
@Override
public void disableTable(String tableName) throws IOException
{
adm.disableTable(tableName);
}
@Override
public void enableTable(String tableName) throws IOException
{
adm.enableTable(tableName);
}
@Override
public boolean isTableDisabled(String tableName) throws IOException
{
return adm.isTableDisabled(tableName);
}
@Override
public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException
{
adm.addColumn(tableName, columnDescriptor);
}
@Override
public void close() throws IOException
{
adm.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HBaseAdmin1_0 implements AdminMask
{
private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class);
private final Admin adm;
public HBaseAdmin1_0(HBaseAdmin adm)
{
this.adm = adm;
}
@Override
public void clearTable(String tableString, long timestamp) throws IOException
{
TableName tableName = TableName.valueOf(tableString);
if (!adm.tableExists(tableName)) {
log.debug("Attempted to clear table {} before it exists (noop)", tableString);
return;
}
if (!adm.isTableDisabled(tableName))
adm.disableTable(tableName);
if (!adm.isTableDisabled(tableName))
throw new RuntimeException("Unable to disable table " + tableName);
// This API call appears to both truncate and reenable the table.
log.info("Truncating table {}", tableName);
adm.truncateTable(tableName, true /* preserve splits */);
try {
adm.enableTable(tableName);
} catch (TableNotDisabledException e) {
// This triggers seemingly every time in testing with 1.0.2.
log.debug("Table automatically reenabled by truncation: {}", tableName, e);
}
}
@Override
public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException
{
return adm.getTableDescriptor(TableName.valueOf(tableString));
}
@Override
public boolean tableExists(String tableString) throws IOException
{
return adm.tableExists(TableName.valueOf(tableString));
}
@Override
public void createTable(HTableDescriptor desc) throws IOException
{
adm.createTable(desc);
}
@Override
public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException
{
adm.createTable(desc, startKey, endKey, numRegions);
}
@Override
public int getEstimatedRegionServerCount()
{
int serverCount = -1;
try {
serverCount = adm.getClusterStatus().getServers().size();
log.debug("Read {} servers from HBase ClusterStatus", serverCount);
} catch (IOException e) {
log.debug("Unable to retrieve HBase cluster status", e);
}
return serverCount;
}
@Override
public void disableTable(String tableString) throws IOException
{
adm.disableTable(TableName.valueOf(tableString));
}
@Override
public void enableTable(String tableString) throws IOException
{
adm.enableTable(TableName.valueOf(tableString));
}
@Override
public boolean isTableDisabled(String tableString) throws IOException
{
return adm.isTableDisabled(TableName.valueOf(tableString));
}
@Override
public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException
{
adm.addColumn(TableName.valueOf(tableString), columnDescriptor);
}
@Override
public void close() throws IOException
{
adm.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
public interface HBaseCompat {
/**
* Configure the compression scheme {@code algo} on a column family
* descriptor {@code cd}. The {@code algo} parameter is a string value
* corresponding to one of the values of HBase's Compression enum. The
* Compression enum has moved between packages as HBase has evolved, which
* is why this method has a String argument in the signature instead of the
* enum itself.
*
* @param cd
* column family to configure
* @param algo
* compression type to use
*/
void setCompression(HColumnDescriptor cd, String algo);
/**
* Create and return a HTableDescriptor instance with the given name. The
* constructors on this method have remained stable over HBase development
* so far, but the old HTableDescriptor(String) constructor & byte[] friends
* are now marked deprecated and may eventually be removed in favor of the
* HTableDescriptor(TableName) constructor. That constructor (and the
* TableName type) only exists in newer HBase versions. Hence this method.
*
* @param tableName
* HBase table name
* @return a new table descriptor instance
*/
HTableDescriptor newTableDescriptor(String tableName);
ConnectionMask createConnection(Configuration conf) throws IOException;
void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc);
void setTimestamp(Delete d, long timestamp);
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.io.compress.Compression;
public class HBaseCompat0_98 implements HBaseCompat {
@Override
public void setCompression(HColumnDescriptor cd, String algo) {
cd.setCompressionType(Compression.Algorithm.valueOf(algo));
}
@Override
public HTableDescriptor newTableDescriptor(String tableName) {
TableName tn = TableName.valueOf(tableName);
return new HTableDescriptor(tn);
}
@Override
public ConnectionMask createConnection(Configuration conf) throws IOException
{
return new HConnection0_98(HConnectionManager.createConnection(conf));
}
@Override
public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
{
tdesc.addFamily(cdesc);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.io.compress.Compression;
public class HBaseCompat1_0 implements HBaseCompat {
@Override
public void setCompression(HColumnDescriptor cd, String algo) {
cd.setCompressionType(Compression.Algorithm.valueOf(algo));
}
@Override
public HTableDescriptor newTableDescriptor(String tableName) {
TableName tn = TableName.valueOf(tableName);
return new HTableDescriptor(tn);
}
@Override
public ConnectionMask createConnection(Configuration conf) throws IOException
{
return new HConnection1_0(ConnectionFactory.createConnection(conf));
}
@Override
public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
{
tdesc.addFamily(cdesc);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.io.compress.Compression;
import java.io.IOException;
public class HBaseCompat1_1 implements HBaseCompat {
@Override
public void setCompression(HColumnDescriptor cd, String algo) {
cd.setCompressionType(Compression.Algorithm.valueOf(algo));
}
@Override
public HTableDescriptor newTableDescriptor(String tableName) {
TableName tn = TableName.valueOf(tableName);
return new HTableDescriptor(tn);
}
@Override
public ConnectionMask createConnection(Configuration conf) throws IOException
{
return new HConnection1_0(ConnectionFactory.createConnection(conf));
}
@Override
public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc)
{
tdesc.addFamily(cdesc);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.util.Arrays;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HBaseCompatLoader {
private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class);
private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1";
private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1";
private static HBaseCompat cachedCompat;
public synchronized static HBaseCompat getCompat(String classOverride) {
if (null != cachedCompat) {
log.debug("Returning cached HBase compatibility layer: {}", cachedCompat);
return cachedCompat;
}
HBaseCompat compat;
String className = null;
String classNameSource = null;
if (null != classOverride) {
className = classOverride;
classNameSource = "from explicit configuration";
} else {
String hbaseVersion = VersionInfo.getVersion();
for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) {
if (hbaseVersion.startsWith(supportedVersion + ".")) {
className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_");
classNameSource = "supporting runtime HBase version " + hbaseVersion;
break;
}
}
if (null == className) {
log.info("The HBase version {} is not explicitly supported by Titan. " +
"Loading Titan's compatibility layer for its most recent supported HBase version ({})",
hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION);
className = DEFAULT_HBASE_CLASS_NAME;
classNameSource = " by default";
}
}
final String errTemplate = " when instantiating HBase compatibility class " + className;
try {
compat = (HBaseCompat)Class.forName(className).newInstance();
log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName());
} catch (IllegalAccessException | ClassNotFoundException | InstantiationException e) {
throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e);
}
return cachedCompat = compat;
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* This class overrides and adds nothing compared with
* {@link com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific
* to HBase, which lets us check for user errors like passing a Cassandra
* transaction into a HBase method.
*
* @author Dan LaRocque <dalaro@hopcount.org>
*/
public class HBaseTransaction extends AbstractStoreTransaction {
private static final Logger log = LoggerFactory.getLogger(HBaseTransaction.class);
LocalLockMediator<StoreTransaction> llm;
Set<KeyColumn> keyColumnLocks = new LinkedHashSet<>();
public HBaseTransaction(final BaseTransactionConfig config, LocalLockMediator<StoreTransaction> llm) {
super(config);
this.llm = llm;
}
@Override
public synchronized void rollback() throws BackendException {
super.rollback();
log.debug("Rolled back transaction");
deleteAllLocks();
}
@Override
public synchronized void commit() throws BackendException {
super.commit();
log.debug("Committed transaction");
deleteAllLocks();
}
public void updateLocks(KeyColumn lockID, StaticBuffer expectedValue) {
keyColumnLocks.add(lockID);
}
private void deleteAllLocks() {
for(KeyColumn kc : keyColumnLocks) {
log.debug("Removed lock {} ", kc);
llm.unlock(kc, this);
}
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
public class HConnection0_98 implements ConnectionMask
{
private final HConnection cnx;
public HConnection0_98(HConnection cnx)
{
this.cnx = cnx;
}
@Override
public TableMask getTable(String name) throws IOException
{
return new HTable0_98(cnx.getTable(name));
}
@Override
public AdminMask getAdmin() throws IOException
{
return new HBaseAdmin0_98(new HBaseAdmin(cnx));
}
@Override
public void close() throws IOException
{
cnx.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class HConnection1_0 implements ConnectionMask
{
private final Connection cnx;
public HConnection1_0(Connection cnx)
{
this.cnx = cnx;
}
@Override
public TableMask getTable(String name) throws IOException
{
return new HTable1_0(cnx.getTable(TableName.valueOf(name)));
}
@Override
public AdminMask getAdmin() throws IOException
{
return new HBaseAdmin1_0(new HBaseAdmin(cnx));
}
@Override
public void close() throws IOException
{
cnx.close();
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
public class HTable0_98 implements TableMask
{
private final HTableInterface table;
public HTable0_98(HTableInterface table)
{
this.table = table;
}
@Override
public ResultScanner getScanner(Scan filter) throws IOException
{
return table.getScanner(filter);
}
@Override
public Result[] get(List<Get> gets) throws IOException
{
return table.get(gets);
}
@Override
public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
{
table.batch(writes, results);
table.flushCommits();
}
@Override
public void close() throws IOException
{
table.close();
}
@Override
public Object getTableObject() {
return table;
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
public class HTable1_0 implements TableMask
{
private final Table table;
public HTable1_0(Table table)
{
this.table = table;
}
@Override
public ResultScanner getScanner(Scan filter) throws IOException
{
return table.getScanner(filter);
}
@Override
public Result[] get(List<Get> gets) throws IOException
{
return table.get(gets);
}
@Override
public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException
{
table.batch(writes, results);
/* table.flushCommits(); not needed anymore */
}
@Override
public void close() throws IOException
{
table.close();
}
@Override
public Object getTableObject() {
return table;
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
/**
* This interface hides ABI/API breaking changes that HBase has made to its Table/HTableInterface over the course
* of development from 0.94 to 1.0 and beyond.
*/
public interface TableMask extends Closeable
{
ResultScanner getScanner(Scan filter) throws IOException;
Result[] get(List<Get> gets) throws IOException;
void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException;
Object getTableObject();
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.graphdb.database.idassigner;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.thinkaurelius.titan.core.TitanException;
import com.thinkaurelius.titan.core.attribute.Duration;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.IDAuthority;
import com.thinkaurelius.titan.diskstorage.IDBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author Matthias Broecheler (me@matthiasb.com)
*/
public class StandardIDPool implements IDPool {
private static final Logger log =
LoggerFactory.getLogger(StandardIDPool.class);
private static final TimeUnit SCHEDULING_TIME_UNIT =
TimeUnit.MILLISECONDS; // TODO
private static final IDBlock ID_POOL_EXHAUSTION = new IDBlock() {
@Override
public long numIds() {
throw new UnsupportedOperationException();
}
@Override
public long getId(long index) {
throw new UnsupportedOperationException();
}
};
private static final IDBlock UNINITIALIZED_BLOCK = new IDBlock() {
@Override
public long numIds() {
return 0;
}
@Override
public long getId(long index) {
throw new ArrayIndexOutOfBoundsException(0);
}
};
private static final int RENEW_ID_COUNT = 100;
private final IDAuthority idAuthority;
private final long idUpperBound; //exclusive
private final int partition;
private final int idNamespace;
private final Duration renewTimeout;
private final double renewBufferPercentage;
private IDBlock currentBlock;
private long currentIndex;
private long renewBlockIndex;
// private long nextID;
// private long currentMaxID;
// private long renewBufferID;
private volatile IDBlock nextBlock;
private Future<?> idBlockFuture;
private final ThreadPoolExecutor exec;
private volatile boolean initialized;
private volatile boolean closed;
public StandardIDPool(IDAuthority idAuthority, int partition, int idNamespace, long idUpperBound, Duration renewTimeout, double renewBufferPercentage) {
Preconditions.checkArgument(idUpperBound > 0);
this.idAuthority = idAuthority;
Preconditions.checkArgument(partition>=0);
this.partition = partition;
Preconditions.checkArgument(idNamespace>=0);
this.idNamespace = idNamespace;
this.idUpperBound = idUpperBound;
Preconditions.checkArgument(!renewTimeout.isZeroLength(), "Renew-timeout must be positive");
this.renewTimeout = renewTimeout;
Preconditions.checkArgument(renewBufferPercentage>0.0 && renewBufferPercentage<=1.0,"Renew-buffer percentage must be in (0.0,1.0]");
this.renewBufferPercentage = renewBufferPercentage;
currentBlock = UNINITIALIZED_BLOCK;
currentIndex = 0;
renewBlockIndex = 0;
nextBlock = null;
// daemon=true would probably be fine too
exec = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("TitanID(" + partition + ")("+idNamespace+")[%d]")
.build());
//exec.allowCoreThreadTimeOut(false);
//exec.prestartCoreThread();
idBlockFuture = null;
initialized = false;
closed = false;
}
private void waitForIDRenewer() throws InterruptedException {
long start = swStart();
if (null != idBlockFuture) {
try {
idBlockFuture.get(renewTimeout.getLength(SCHEDULING_TIME_UNIT), SCHEDULING_TIME_UNIT);
} catch (ExecutionException e) {
String msg = String.format("ID block allocation on partition(%d)-namespace(%d) failed with an exception in %s",
partition, idNamespace, swStop(start));
throw new TitanException(msg, e);
} catch (TimeoutException e) {
// Attempt to cancel the renewer
idBlockFuture.cancel(true);
String msg = String.format("ID block allocation on partition(%d)-namespace(%d) timed out in %s",
partition, idNamespace, swStop(start));
throw new TitanException(msg, e);
} catch (CancellationException e) {
String msg = String.format("ID block allocation on partition(%d)-namespace(%d) was cancelled after %s",
partition, idNamespace, swStop(start));
throw new TitanException(msg, e);
} finally {
idBlockFuture = null;
}
// Allow InterruptedException to propagate up the stack
}
}
private long swStop(long start) {
return swStart() - start;
}
private synchronized void nextBlock() throws InterruptedException {
assert currentIndex == currentBlock.numIds();
Preconditions.checkState(!closed,"ID Pool has been closed for partition(%s)-namespace(%s) - cannot apply for new id block",
partition,idNamespace);
waitForIDRenewer();
if (nextBlock == ID_POOL_EXHAUSTION)
throw new IDPoolExhaustedException("Exhausted ID Pool for partition(" + partition+")-namespace("+idNamespace+")");
Preconditions.checkArgument(nextBlock!=null);
currentBlock = nextBlock;
currentIndex = 0;
log.debug("ID partition({})-namespace({}) acquired block: [{}]", partition, idNamespace, currentBlock);
assert currentBlock.numIds()>0;
nextBlock = null;
assert RENEW_ID_COUNT>0;
renewBlockIndex = Math.max(0,currentBlock.numIds()-Math.max(RENEW_ID_COUNT, Math.round(currentBlock.numIds()*renewBufferPercentage)));
assert renewBlockIndex<currentBlock.numIds() && renewBlockIndex>=currentIndex;
}
private void renewBuffer() {
Preconditions.checkArgument(nextBlock == null, nextBlock);
try {
long start = swStart();
IDBlock idBlock = idAuthority.getIDBlock(partition, idNamespace, renewTimeout);
log.debug("Retrieved ID block from authority on partition({})-namespace({}) in {}", partition, idNamespace, swStop(start));
Preconditions.checkArgument(idBlock!=null && idBlock.numIds()>0);
nextBlock = idBlock;
} catch (BackendException e) {
throw new TitanException("Could not acquire new ID block from storage", e);
} catch (IDPoolExhaustedException e) {
nextBlock = ID_POOL_EXHAUSTION;
}
}
private long swStart() {
return System.currentTimeMillis();
}
@Override
public synchronized long nextID() {
assert currentIndex <= currentBlock.numIds();
if (!initialized) {
startNextIDAcquisition();
initialized = true;
}
if (currentIndex == currentBlock.numIds()) {
try {
nextBlock();
} catch (InterruptedException e) {
throw new TitanException("Could not renew id block due to interruption", e);
}
}
if (currentIndex == renewBlockIndex) {
startNextIDAcquisition();
}
long returnId = currentBlock.getId(currentIndex);
currentIndex++;
if (returnId >= idUpperBound) throw new IDPoolExhaustedException("Reached id upper bound of " + idUpperBound);
log.trace("partition({})-namespace({}) Returned id: {}", partition, idNamespace, returnId);
return returnId;
}
@Override
public synchronized void close() {
closed=true;
//Wait for renewer to finish -- call exec.shutdownNow() instead?
try {
waitForIDRenewer();
} catch (InterruptedException e) {
throw new TitanException("Interrupted while waiting for id renewer thread to finish", e);
}
exec.shutdownNow();
}
private void startNextIDAcquisition() {
Preconditions.checkArgument(idBlockFuture == null, idBlockFuture);
if (closed) return; //Don't renew anymore if closed
//Renew buffer
log.debug("Starting id block renewal thread upon {}", currentIndex);
idBlockFuture = exec.submit(new IDBlockRunnable());
}
private class IDBlockRunnable implements Runnable {
@Override
public void run() {
renewBuffer();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.graphdb.query.condition;
import com.google.common.base.Preconditions;
import com.thinkaurelius.titan.core.*;
import com.thinkaurelius.titan.graphdb.internal.InternalElement;
import com.thinkaurelius.titan.graphdb.internal.InternalRelationType;
import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
import com.thinkaurelius.titan.graphdb.util.ElementHelper;
import com.tinkerpop.blueprints.Direction;
import org.apache.commons.lang.builder.HashCodeBuilder;
import java.util.Iterator;
/**
* @author Matthias Broecheler (me@matthiasb.com)
*/
public class PredicateCondition<K, E extends TitanElement> extends Literal<E> {
private final K key;
private final TitanPredicate predicate;
private final Object value;
public PredicateCondition(K key, TitanPredicate predicate, Object value) {
Preconditions.checkNotNull(key);
Preconditions.checkArgument(key instanceof String || key instanceof RelationType);
Preconditions.checkNotNull(predicate);
this.key = key;
this.predicate = predicate;
this.value = value;
}
private boolean satisfiesCondition(Object value) {
return predicate.evaluate(value, this.value);
}
@Override
public boolean evaluate(E element) {
RelationType type;
if (key instanceof String) {
type = ((InternalElement) element).tx().getRelationType((String) key);
if (type == null)
return satisfiesCondition(null);
} else {
type = (RelationType) key;
}
Preconditions.checkNotNull(type);
if (type.isPropertyKey()) {
Iterator<Object> iter = ElementHelper.getValues(element,(PropertyKey)type).iterator();
if (iter.hasNext()) {
while (iter.hasNext()) {
if (satisfiesCondition(iter.next()))
return true;
}
return false;
}
return satisfiesCondition(null);
} else {
assert ((InternalRelationType)type).getMultiplicity().isUnique(Direction.OUT);
return satisfiesCondition(((TitanRelation) element).getProperty((EdgeLabel) type));
}
}
public K getKey() {
return key;
}
public TitanPredicate getPredicate() {
return predicate;
}
public Object getValue() {
return value;
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(getType()).append(key).append(predicate).append(value).toHashCode();
}
@Override
public boolean equals(Object other) {
if (this == other)
return true;
if (other == null || !getClass().isInstance(other))
return false;
PredicateCondition oth = (PredicateCondition) other;
return key.equals(oth.key) && predicate.equals(oth.predicate) && compareValue(value, oth.value);
}
// ATLAS-2214: There's a issue when working with isNull, notNull operators
// When string/boolean attributes use the following sequence of filtering on AtlasVertex attributes then the code
// runs into NPE
// 1. boolean attr "x" != false/true | boolean attr "x" == false/true
// 2. boolean attr notNull 'query.has("x") | boolean attr isNull 'query.hasNot("x")'
// whereas if the sequence is reversed then the NPE is not encountered
// Similar behavior is exhibited for the string attributes
// Workaround is to allow null == null value comparision
private boolean compareValue(final Object left, final Object right) {
return left == null ? right == null : left.equals(right);
}
@Override
public String toString() {
return key + " " + predicate + " " + String.valueOf(value);
}
public static <K, E extends TitanElement> PredicateCondition<K, E> of(K key, TitanPredicate titanPredicate, Object condition) {
return new PredicateCondition<K, E>(key, titanPredicate, condition);
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import com.thinkaurelius.titan.core.EdgeLabel;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
import org.apache.atlas.repository.graphdb.titan0.query.Titan0GraphQuery;
import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
/**
* Factory that serves up instances of graph database abstraction layer classes
* that correspond to Titan/Tinkerpop classes.
*/
public final class GraphDbObjectFactory {
private GraphDbObjectFactory() {
}
/**
* Creates a Titan0Edge that corresponds to the given Gremlin Edge.
*
* @param graph The graph the edge should be created in
* @param source The gremlin edge
*/
public static Titan0Edge createEdge(Titan0Graph graph, Edge source) {
if (source == null) {
return null;
}
return new Titan0Edge(graph, source);
}
/**
* Creates a Titan0GraphQuery that corresponds to the given GraphQuery.
*
* @param graph the graph that is being quried
*/
public static Titan0GraphQuery createQuery(Titan0Graph graph) {
return new Titan0GraphQuery(graph);
}
/**
* Creates a Titan0Vertex that corresponds to the given Gremlin Vertex.
*
* @param graph The graph that contains the vertex
* @param source the Gremlin vertex
*/
public static Titan0Vertex createVertex(Titan0Graph graph, Vertex source) {
if (source == null) {
return null;
}
return new Titan0Vertex(graph, source);
}
/**
* @param propertyKey The Gremlin propertyKey.
*
*/
public static Titan0PropertyKey createPropertyKey(PropertyKey propertyKey) {
if (propertyKey == null) {
return null;
}
return new Titan0PropertyKey(propertyKey);
}
/**
* @param label The label.
*
*/
public static Titan0EdgeLabel createEdgeLabel(EdgeLabel label) {
if (label == null) {
return null;
}
return new Titan0EdgeLabel(label);
}
/**
* @param index The gremlin index.
* @return
*/
public static AtlasGraphIndex createGraphIndex(TitanGraphIndex index) {
if (index == null) {
return null;
}
return new Titan0GraphIndex(index);
}
/**
* Converts a Multiplicity to a Cardinality.
*
* @param cardinality
* @return
*/
public static AtlasCardinality createCardinality(Cardinality cardinality) {
if (cardinality == Cardinality.SINGLE) {
return AtlasCardinality.SINGLE;
} else if (cardinality == Cardinality.LIST) {
return AtlasCardinality.LIST;
}
return AtlasCardinality.SET;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
/**
* Titan 0.5.4 implementation of AtlasEdge.
*/
public class Titan0Edge extends Titan0Element<Edge> implements AtlasEdge<Titan0Vertex, Titan0Edge> {
public Titan0Edge(Titan0Graph graph, Edge edge) {
super(graph, edge);
}
@Override
public String getLabel() {
return wrappedElement.getLabel();
}
@Override
public Titan0Edge getE() {
return this;
}
@Override
public AtlasVertex<Titan0Vertex, Titan0Edge> getInVertex() {
Vertex v = wrappedElement.getVertex(Direction.IN);
return GraphDbObjectFactory.createVertex(graph, v);
}
@Override
public AtlasVertex<Titan0Vertex, Titan0Edge> getOutVertex() {
Vertex v = wrappedElement.getVertex(Direction.OUT);
return GraphDbObjectFactory.createVertex(graph, v);
}
@Override
public String toString() {
return "Titan0Edge [id=" + getId() + "]";
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import com.thinkaurelius.titan.core.EdgeLabel;
import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
/**
* Titan 0.5.4 implementaiton of AtlasEdgeLabel.
*/
public class Titan0EdgeLabel implements AtlasEdgeLabel {
private final EdgeLabel wrappedEdgeLabel;
public Titan0EdgeLabel(EdgeLabel toWrap) {
wrappedEdgeLabel = toWrap;
}
/*
* (non-Javadoc)
*
* @see org.apache.atlas.repository.graphdb.AtlasEdgeLabel#getName()
*/
@Override
public String getName() {
return wrappedEdgeLabel.getName();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof Titan0EdgeLabel)) {
return false;
}
Titan0EdgeLabel otherLabel = (Titan0EdgeLabel) other;
return wrappedEdgeLabel.equals(otherLabel.wrappedEdgeLabel);
}
@Override
public int hashCode() {
int result = 17;
result = 37 * result + wrappedEdgeLabel.hashCode();
return result;
}
@Override
public String toString() {
return wrappedEdgeLabel.getName();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import java.lang.Override;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.thinkaurelius.titan.core.SchemaViolationException;
import com.thinkaurelius.titan.core.TitanElement;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
/**
* Titan 0.5.4 implementation of AtlasElement.
*/
public class Titan0Element<T extends Element> implements AtlasElement {
protected Titan0Graph graph;
protected T wrappedElement;
public Titan0Element(Titan0Graph graph, T element) {
wrappedElement = element;
this.graph = graph;
}
@Override
public Object getId() {
return wrappedElement.getId();
}
@Override
public Set<String> getPropertyKeys() {
return wrappedElement.getPropertyKeys();
}
@Override
public <U> void setProperty(String propertyName, U value) {
try {
wrappedElement.setProperty(propertyName, value);
} catch (SchemaViolationException e) {
throw new AtlasSchemaViolationException(e);
}
}
@Override
public <U> U getProperty(String propertyName, Class<U> clazz) {
Object rawValue = wrappedElement.getProperty(propertyName);
if (rawValue == null) {
return null;
}
if (AtlasEdge.class == clazz) {
return (U)graph.getEdge(rawValue.toString());
}
if (AtlasVertex.class == clazz) {
return (U)graph.getVertex(rawValue.toString());
}
return (U)rawValue;
}
/**
* Gets all of the values of the given property.
* @param propertyName
* @return
*/
@Override
public <T> Collection<T> getPropertyValues(String propertyName, Class<T> type) {
return Collections.singleton(getProperty(propertyName, type));
}
@Override
public void removeProperty(String propertyName) {
wrappedElement.removeProperty(propertyName);
}
@Override
public JSONObject toJson(Set<String> propertyKeys) throws JSONException {
return GraphSONUtility.jsonFromElement(wrappedElement, propertyKeys, GraphSONMode.NORMAL);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.atlas.repository.graphdb.AtlasElement#getListProperty(java.
* lang.String)
*/
@Override
public List<String> getListProperty(String propertyName) {
return getProperty(propertyName, List.class);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.atlas.repository.graphdb.AtlasElement#setListProperty(java.
* lang.String, java.util.List)
*/
@Override
public void setListProperty(String propertyName, List<String> values) {
setProperty(propertyName, values);
}
@Override
public T getWrappedElement() {
return wrappedElement;
}
@Override
public int hashCode() {
int result = 37;
result = 17 * result + getClass().hashCode();
result = 17 * result + getWrappedElement().hashCode();
return result;
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass() != getClass()) {
return false;
}
Titan0Element otherElement = (Titan0Element) other;
return getWrappedElement().equals(otherElement.getWrappedElement());
}
/*
* (non-Javadoc)
*
* @see org.apache.atlas.repository.graphdb.AtlasElement#exists()
*/
@Override
public boolean exists() {
try {
return !((TitanElement)wrappedElement).isRemoved();
} catch(IllegalStateException e) {
return false;
}
}
/*
* (non-Javadoc)
*
* @see
* org.apache.atlas.repository.graphdb.AtlasElement#setJsonProperty(java.
* lang.String, java.lang.Object)
*/
@Override
public <T> void setJsonProperty(String propertyName, T value) {
setProperty(propertyName, value);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.atlas.repository.graphdb.AtlasElement#getJsonProperty(java.
* lang.String)
*/
@Override
public <T> T getJsonProperty(String propertyName) {
return (T) getProperty(propertyName, String.class);
}
@Override
public String getIdForDisplay() {
return getId().toString();
}
@Override
public <V> List<V> getListProperty(String propertyName, Class<V> elementType) {
List<String> value = getListProperty(propertyName);
if (value == null) {
return null;
}
if (AtlasEdge.class == elementType) {
return (List<V>)Lists.transform(value, new Function<String, AtlasEdge>(){
@Override
public AtlasEdge apply(String input) {
return graph.getEdge(input);
}
});
}
if (AtlasVertex.class == elementType) {
return (List<V>)Lists.transform(value, new Function<String, AtlasVertex>(){
@Override
public AtlasVertex apply(String input) {
return graph.getVertex(input);
}
});
}
return (List<V>)value;
}
@Override
public void setPropertyFromElementsIds(String propertyName, List<AtlasElement> values) {
List<String> propertyValue = new ArrayList<>(values.size());
for(AtlasElement element: values) {
propertyValue.add(element.getId().toString());
}
setProperty(propertyName, propertyValue);
}
@Override
public void setPropertyFromElementId(String propertyName, AtlasElement value) {
setProperty(propertyName, value.getId().toString());
}
@Override
public boolean isIdAssigned() {
return true;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Map;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDatabase;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.TitanGraph;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.thinkaurelius.titan.core.util.TitanCleanup;
import com.thinkaurelius.titan.diskstorage.StandardIndexProvider;
import com.thinkaurelius.titan.diskstorage.solr.Solr5Index;
/**
* Titan 0.5.4 implementation of GraphDatabase.
*/
public class Titan0GraphDatabase implements GraphDatabase<Titan0Vertex, Titan0Edge> {
private static final Logger LOG = LoggerFactory.getLogger(Titan0GraphDatabase.class);
/**
* Constant for the configuration property that indicates the prefix.
*/
public static final String GRAPH_PREFIX = "atlas.graph";
public static final String INDEX_BACKEND_CONF = "index.search.backend";
public static final String INDEX_BACKEND_LUCENE = "lucene";
public static final String INDEX_BACKEND_ES = "elasticsearch";
private static volatile Titan0Graph atlasGraphInstance = null;
private static volatile TitanGraph graphInstance = null;
public static Configuration getConfiguration() throws AtlasException {
Configuration configProperties = ApplicationProperties.get();
return ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX);
}
static {
addSolr5Index();
}
/**
* Titan loads index backend name to implementation using
* StandardIndexProvider.ALL_MANAGER_CLASSES But
* StandardIndexProvider.ALL_MANAGER_CLASSES is a private static final
* ImmutableMap Only way to inject Solr5Index is to modify this field. So,
* using hacky reflection to add Sol5Index
*/
private static void addSolr5Index() {
try {
Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES");
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
Map<String, String> customMap = new HashMap<>(StandardIndexProvider.getAllProviderClasses());
customMap.put("solr", Solr5Index.class.getName()); // for
// consistency
// with Titan
// 1.0.0
customMap.put("solr5", Solr5Index.class.getName()); // for backward
// compatibility
ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap);
field.set(null, immap);
LOG.debug("Injected solr5 index - {}", Solr5Index.class.getName());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static TitanGraph getGraphInstance() {
if (graphInstance == null) {
synchronized (Titan0GraphDatabase.class) {
if (graphInstance == null) {
Configuration config;
try {
config = getConfiguration();
} catch (AtlasException e) {
throw new RuntimeException(e);
}
graphInstance = TitanFactory.open(config);
atlasGraphInstance = new Titan0Graph();
validateIndexBackend(config);
}
}
}
return graphInstance;
}
public static void unload() {
synchronized (Titan0GraphDatabase.class) {
if (graphInstance == null) {
return;
}
graphInstance.commit();
//shutdown invalidates the graph instance
graphInstance.shutdown();
graphInstance = null;
}
}
static void validateIndexBackend(Configuration config) {
String configuredIndexBackend = config.getString(INDEX_BACKEND_CONF);
TitanManagement managementSystem = null;
try {
managementSystem = getGraphInstance().getManagementSystem();
String currentIndexBackend = managementSystem.get(INDEX_BACKEND_CONF);
if (!equals(configuredIndexBackend, currentIndexBackend)) {
throw new RuntimeException("Configured Index Backend " + configuredIndexBackend
+ " differs from earlier configured Index Backend " + currentIndexBackend + ". Aborting!");
}
} finally {
if (managementSystem != null) {
managementSystem.commit();
}
}
}
private static boolean equals(Object o1, Object o2) {
if (o1 == null) {
return o2 == null;
}
return o1.equals(o2);
}
@Override
public AtlasGraph<Titan0Vertex, Titan0Edge> getGraph() {
// force graph loading up front to avoid bootstrapping
// issues
getGraphInstance();
return atlasGraphInstance;
}
@Override
public boolean isGraphLoaded() {
return graphInstance != null;
}
@Override
public void initializeTestGraph() {
//nothing to do
}
@Override
public void cleanup() {
try {
getGraphInstance().shutdown();
} catch(Throwable t) {
LOG.warn("Could not shutdown test TitanGraph", t);
t.printStackTrace();
}
try {
TitanCleanup.clear(getGraphInstance());
} catch(Throwable t) {
LOG.warn("Could not clear test TitanGraph", t);
t.printStackTrace();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import java.util.HashSet;
import java.util.Set;
import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
/**
* Titan 0.5.4 implementation of AtlasGraphIndex.
*/
public class Titan0GraphIndex implements AtlasGraphIndex {
private TitanGraphIndex wrappedIndex;
public Titan0GraphIndex(TitanGraphIndex toWrap) {
wrappedIndex = toWrap;
}
/* (non-Javadoc)
* @see org.apache.atlas.repository.graphdb.AtlasGraphIndex#isMixedIndex()
*/
@Override
public boolean isMixedIndex() {
return wrappedIndex.isMixedIndex();
}
/* (non-Javadoc)
* @see org.apache.atlas.repository.graphdb.AtlasGraphIndex#isEdgeIndex()
*/
@Override
public boolean isEdgeIndex() {
return Edge.class.isAssignableFrom(wrappedIndex.getIndexedElement());
}
/* (non-Javadoc)
* @see org.apache.atlas.repository.graphdb.AtlasGraphIndex#isVertexIndex()
*/
@Override
public boolean isVertexIndex() {
return Vertex.class.isAssignableFrom(wrappedIndex.getIndexedElement());
}
/* (non-Javadoc)
* @see org.apache.atlas.repository.graphdb.AtlasGraphIndex#isCompositeIndex()
*/
@Override
public boolean isCompositeIndex() {
return wrappedIndex.isCompositeIndex();
}
/* (non-Javadoc)
* @see org.apache.atlas.repository.graphdb.AtlasGraphIndex#isUnique()
*/
@Override
public boolean isUnique() {
return wrappedIndex.isUnique();
}
/* (non-Javadoc)
* @see org.apache.atlas.repository.graphdb.AtlasGraphIndex#getFieldKeys()
*/
@Override
public Set<AtlasPropertyKey> getFieldKeys() {
PropertyKey[] keys = wrappedIndex.getFieldKeys();
Set<AtlasPropertyKey> result = new HashSet<>();
for(PropertyKey key : keys) {
result.add(GraphDbObjectFactory.createPropertyKey(key));
}
return result;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.EdgeLabel;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.schema.PropertyKeyMaker;
import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Titan 0.5.4 implementation of AtlasGraphManagement.
*/
public class Titan0GraphManagement implements AtlasGraphManagement {
private static final Logger LOG = LoggerFactory.getLogger(Titan0GraphManagement.class);
private final Titan0Graph graph;
private final TitanManagement management;
private Set<String> newMultProperties = new HashSet<>();
public Titan0GraphManagement(Titan0Graph graph, TitanManagement managementSystem) {
this.graph = graph;
management = managementSystem;
}
@Override
public void createEdgeMixedIndex(String index, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
}
@Override
public void createEdgeIndex(String label, String indexName, AtlasEdgeDirection edgeDirection, List<AtlasPropertyKey> propertyKeys) {
EdgeLabel edgeLabel = management.getEdgeLabel(label);
if (edgeLabel == null) {
edgeLabel = management.makeEdgeLabel(label).make();
}
Direction direction = TitanObjectFactory.createDirection(edgeDirection);
PropertyKey[] keys = TitanObjectFactory.createPropertyKeys(propertyKeys);
management.buildEdgeIndex(edgeLabel, indexName, direction, keys);
}
@Override
public void createFullTextMixedIndex(String index, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
}
private void buildMixedIndex(String index, Class<? extends Element> titanClass, String backingIndex) {
management.buildIndex(index, titanClass).buildMixedIndex(backingIndex);
}
@Override
public boolean containsPropertyKey(String propertyKey) {
return management.containsPropertyKey(propertyKey);
}
@Override
public void rollback() {
management.rollback();
}
@Override
public void commit() {
graph.addMultiProperties(newMultProperties);
newMultProperties.clear();
management.commit();
}
@Override
public AtlasPropertyKey makePropertyKey(String propertyName, Class propertyClass, AtlasCardinality cardinality) {
if (cardinality.isMany()) {
newMultProperties.add(propertyName);
}
PropertyKeyMaker propertyKeyBuilder = management.makePropertyKey(propertyName).dataType(propertyClass);
if (cardinality != null) {
Cardinality titanCardinality = TitanObjectFactory.createCardinality(cardinality);
propertyKeyBuilder.cardinality(titanCardinality);
}
PropertyKey propertyKey = propertyKeyBuilder.make();
return GraphDbObjectFactory.createPropertyKey(propertyKey);
}
@Override
public AtlasEdgeLabel makeEdgeLabel(String label) {
EdgeLabel edgeLabel = management.makeEdgeLabel(label).make();
return GraphDbObjectFactory.createEdgeLabel(edgeLabel);
}
@Override
public void deletePropertyKey(String propertyKey) {
PropertyKey titanPropertyKey = management.getPropertyKey(propertyKey);
if (null == titanPropertyKey) return;
for (int i = 0;; i++) {
String deletedKeyName = titanPropertyKey + "_deleted_" + i;
if (null == management.getPropertyKey(deletedKeyName)) {
management.changeName(titanPropertyKey, deletedKeyName);
break;
}
}
}
@Override
public AtlasPropertyKey getPropertyKey(String propertyName) {
return GraphDbObjectFactory.createPropertyKey(management.getPropertyKey(propertyName));
}
@Override
public AtlasEdgeLabel getEdgeLabel(String label) {
return GraphDbObjectFactory.createEdgeLabel(management.getEdgeLabel(label));
}
@Override
public void createVertexCompositeIndex(String propertyName, boolean enforceUniqueness,
List<AtlasPropertyKey> propertyKeys) {
TitanManagement.IndexBuilder indexBuilder = management.buildIndex(propertyName, Vertex.class);
for(AtlasPropertyKey key : propertyKeys) {
PropertyKey titanKey = TitanObjectFactory.createPropertyKey(key);
indexBuilder.addKey(titanKey);
}
if (enforceUniqueness) {
indexBuilder.unique();
}
indexBuilder.buildCompositeIndex();
}
@Override
public void createEdgeCompositeIndex(String propertyName, boolean isUnique, List<AtlasPropertyKey> propertyKeys) {
TitanManagement.IndexBuilder indexBuilder = management.buildIndex(propertyName, Edge.class);
for(AtlasPropertyKey key : propertyKeys) {
PropertyKey titanKey = TitanObjectFactory.createPropertyKey(key);
indexBuilder.addKey(titanKey);
}
if (isUnique) {
indexBuilder.unique();
}
indexBuilder.buildCompositeIndex();
}
@Override
public void createVertexMixedIndex(String propertyName, String backingIndex, List<AtlasPropertyKey> propertyKeys) {
TitanManagement.IndexBuilder indexBuilder = management.buildIndex(propertyName, Vertex.class);
for(AtlasPropertyKey key : propertyKeys) {
PropertyKey titanKey = TitanObjectFactory.createPropertyKey(key);
indexBuilder.addKey(titanKey);
}
indexBuilder.buildMixedIndex(backingIndex);
}
@Override
public void addMixedIndex(String indexName, AtlasPropertyKey propertyKey) {
PropertyKey titanKey = TitanObjectFactory.createPropertyKey(propertyKey);
TitanGraphIndex vertexIndex = management.getGraphIndex(indexName);
management.addIndexKey(vertexIndex, titanKey);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.atlas.repository.graphdb.AtlasGraphManagement#getGraphIndex(
* java.lang.String)
*/
@Override
public AtlasGraphIndex getGraphIndex(String indexName) {
TitanGraphIndex index = management.getGraphIndex(indexName);
return GraphDbObjectFactory.createGraphIndex(index);
}
@Override
public boolean edgeIndexExist(String label, String indexName) {
EdgeLabel edgeLabel = management.getEdgeLabel(label);
return edgeLabel != null && management.getRelationIndex(edgeLabel, indexName) != null;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import java.util.Iterator;
import com.google.common.base.Preconditions;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.thinkaurelius.titan.core.TitanIndexQuery;
import com.tinkerpop.blueprints.Vertex;
/**
* Titan 0.5.4 implementation of AtlasIndexQuery.
*/
public class Titan0IndexQuery implements AtlasIndexQuery<Titan0Vertex, Titan0Edge> {
private Titan0Graph graph;
private TitanIndexQuery wrappedIndexQuery;
public Titan0IndexQuery(Titan0Graph graph, TitanIndexQuery query) {
wrappedIndexQuery = query;
this.graph = graph;
}
@Override
public Iterator<AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge>> vertices() {
Iterator<TitanIndexQuery.Result<Vertex>> results = wrappedIndexQuery.vertices().iterator();
Function<TitanIndexQuery.Result<Vertex>, AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge>> function =
new Function<TitanIndexQuery.Result<Vertex>, AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge>>() {
@Override
public AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge> apply(TitanIndexQuery.Result<Vertex> source) {
return new ResultImpl(source);
}
};
return Iterators.transform(results, function);
}
@Override
public Iterator<Result<Titan0Vertex, Titan0Edge>> vertices(int offset, int limit) {
Preconditions.checkArgument(offset >=0, "Index offset should be greater than or equals to 0");
Preconditions.checkArgument(limit >=0, "Index limit should be greater than or equals to 0");
Iterator<TitanIndexQuery.Result<Vertex>> results = wrappedIndexQuery
.offset(offset)
.limit(limit)
.vertices().iterator();
Function<TitanIndexQuery.Result<Vertex>, AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge>> function =
new Function<TitanIndexQuery.Result<Vertex>, AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge>>() {
@Override
public AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge> apply(TitanIndexQuery.Result<Vertex> source) {
return new ResultImpl(source);
}
};
return Iterators.transform(results, function);
}
private final class ResultImpl implements AtlasIndexQuery.Result<Titan0Vertex, Titan0Edge> {
private TitanIndexQuery.Result<Vertex> wrappedResult;
ResultImpl(TitanIndexQuery.Result<Vertex> source) {
wrappedResult = source;
}
@Override
public AtlasVertex<Titan0Vertex, Titan0Edge> getVertex() {
return GraphDbObjectFactory.createVertex(graph, wrappedResult.getElement());
}
@Override
public double getScore() {
return wrappedResult.getScore();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import com.thinkaurelius.titan.core.PropertyKey;
/**
* Titan 0.5.4 implementaiton of AtlasPropertyKey.
*/
public class Titan0PropertyKey implements AtlasPropertyKey {
private PropertyKey wrappedPropertyKey;
public Titan0PropertyKey(PropertyKey toWrap) {
wrappedPropertyKey = toWrap;
}
/*
* (non-Javadoc)
*
* @see org.apache.atlas.repository.graphdb.AtlasPropertyKey#getName()
*/
@Override
public String getName() {
return wrappedPropertyKey.getName();
}
/**
* @return
*/
public PropertyKey getWrappedPropertyKey() {
return wrappedPropertyKey;
}
@Override
public AtlasCardinality getCardinality() {
return GraphDbObjectFactory.createCardinality(wrappedPropertyKey.getCardinality());
}
@Override
public boolean equals(Object other) {
if (!(other instanceof Titan0PropertyKey)) {
return false;
}
Titan0PropertyKey otherKey = (Titan0PropertyKey) other;
return wrappedPropertyKey.equals(otherKey.wrappedPropertyKey);
}
@Override
public int hashCode() {
int result = 17;
result = 37 * result + wrappedPropertyKey.hashCode();
return result;
}
@Override
public String toString() {
return wrappedPropertyKey.getName();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.AtlasVertexQuery;
import com.thinkaurelius.titan.core.SchemaViolationException;
import com.thinkaurelius.titan.core.TitanProperty;
import com.thinkaurelius.titan.core.TitanVertex;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
/**
* Titan 0.5.4 implementation of AtlasVertex.
*/
public class Titan0Vertex extends Titan0Element<Vertex> implements AtlasVertex<Titan0Vertex, Titan0Edge> {
public Titan0Vertex(Titan0Graph graph, Vertex source) {
super(graph, source);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> getEdges(AtlasEdgeDirection dir, String edgeLabel) {
Iterable<Edge> titanEdges = wrappedElement.getEdges(TitanObjectFactory.createDirection(dir), edgeLabel);
return graph.wrapEdges(titanEdges);
}
private TitanVertex getAsTitanVertex() {
return (TitanVertex) wrappedElement;
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> getEdges(AtlasEdgeDirection in) {
Iterable<Edge> titanResult = wrappedElement.getEdges(TitanObjectFactory.createDirection(in));
return graph.wrapEdges(titanResult);
}
@Override
public <T> T getProperty(String propertyName, Class<T> clazz) {
if (graph.isMultiProperty(propertyName)) {
// throw exception in this case to be consistent with Titan 1.0.0
// behavior.
throw new IllegalStateException();
}
return super.getProperty(propertyName, clazz);
}
public <T> void setProperty(String propertyName, T value) {
try {
super.setProperty(propertyName, value);
} catch (UnsupportedOperationException e) {
// For consistency with Titan 1.0.0, treat sets of multiplicity many
// properties as adds. Handle this here since this is an uncommon
// occurrence.
if (graph.isMultiProperty(propertyName)) {
addProperty(propertyName, value);
} else {
throw e;
}
}
}
@Override
public <T> void addProperty(String propertyName, T value) {
try {
getAsTitanVertex().addProperty(propertyName, value);
} catch (SchemaViolationException e) {
if (getPropertyValues(propertyName, value.getClass()).contains(value)) {
// follow java set semantics, don't throw an exception if
// value is already there.
return;
}
throw new AtlasSchemaViolationException(e);
}
}
@Override
public <T> void addListProperty(String propertyName, T value) {
try {
getAsTitanVertex().addProperty(propertyName, value);
} catch (SchemaViolationException e) {
if (getPropertyValues(propertyName, value.getClass()).contains(value)) {
// follow java set semantics, don't throw an exception if
// value is already there.
return;
}
throw new AtlasSchemaViolationException(e);
}
}
@Override
public <T> Collection<T> getPropertyValues(String key, Class<T> clazz) {
TitanVertex tv = getAsTitanVertex();
Collection<T> result = new ArrayList<>();
for (TitanProperty property : tv.getProperties(key)) {
result.add((T) property.getValue());
}
return result;
}
@Override
public AtlasVertexQuery<Titan0Vertex, Titan0Edge> query() {
return new Titan0VertexQuery(graph, wrappedElement.query());
}
@Override
public Titan0Vertex getV() {
return this;
}
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "Titan0Vertex [id=" + getId() + "]";
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import com.google.common.base.Preconditions;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.AtlasVertexQuery;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.VertexQuery;
/**
* Titan 0.5.4 implementation of AtlasVertexQuery.
*/
public class Titan0VertexQuery implements AtlasVertexQuery<Titan0Vertex, Titan0Edge> {
private Titan0Graph graph;
private VertexQuery vertexQuery;
public Titan0VertexQuery(Titan0Graph graph, VertexQuery vertexQuery) {
this.vertexQuery = vertexQuery;
this.graph = graph;
}
@Override
public AtlasVertexQuery<Titan0Vertex, Titan0Edge> direction(AtlasEdgeDirection queryDirection) {
vertexQuery.direction(TitanObjectFactory.createDirection(queryDirection));
return this;
}
@Override
public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices() {
Iterable<Vertex> vertices = vertexQuery.vertices();
return graph.wrapVertices(vertices);
}
@Override
public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int limit) {
Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
Iterable<Vertex> vertices = vertexQuery.limit(limit).vertices();
return graph.wrapVertices(vertices);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges() {
Iterable<Edge> edges = vertexQuery.edges();
return graph.wrapEdges(edges);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges(int limit) {
Preconditions.checkArgument(limit >=0, "Limit should be greater than or equals to 0");
Iterable<Edge> edges = vertexQuery.limit(limit).edges();
return graph.wrapEdges(edges);
}
@Override
public long count() {
return vertexQuery.count();
}
@Override
public AtlasVertexQuery<Titan0Vertex, Titan0Edge> label(String label) {
vertexQuery.labels(label);
return this;
}
@Override
public AtlasVertexQuery<Titan0Vertex, Titan0Edge> has(String key, Object value) {
vertexQuery.has(key, value);
return this;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.PropertyKey;
import com.tinkerpop.blueprints.Direction;
import java.util.ArrayList;
import java.util.List;
/**
* Factory that serves up instances of Titan/Tinkerpop classes that correspond to
* graph database abstraction layer/Atlas classes.
*/
public final class TitanObjectFactory {
private TitanObjectFactory() {
}
/**
* Retrieves the titan direction corresponding to the given
* AtlasEdgeDirection.
*
* @param dir
* @return
*/
public static Direction createDirection(AtlasEdgeDirection dir) {
switch(dir) {
case IN:
return Direction.IN;
case OUT:
return Direction.OUT;
case BOTH:
return Direction.BOTH;
default:
throw new RuntimeException("Unrecognized direction: " + dir);
}
}
/**
* Converts a Multiplicity to a Cardinality.
*
* @param cardinality
* @return
*/
public static Cardinality createCardinality(AtlasCardinality cardinality) {
switch(cardinality) {
case SINGLE:
return Cardinality.SINGLE;
case LIST:
return Cardinality.LIST;
case SET:
return Cardinality.SET;
default:
throw new IllegalStateException("Unrecognized cardinality: " + cardinality);
}
}
public static PropertyKey createPropertyKey(AtlasPropertyKey key) {
return ((Titan0PropertyKey)key).getWrappedPropertyKey();
}
public static PropertyKey[] createPropertyKeys(List<AtlasPropertyKey> keys) {
PropertyKey[] ret = new PropertyKey[keys.size()];
int i = 0;
for (AtlasPropertyKey key : keys) {
ret[i] = createPropertyKey(key);
i++;
}
return ret;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0.query;
import com.thinkaurelius.titan.core.Order;
import com.thinkaurelius.titan.core.TitanGraphQuery;
import com.thinkaurelius.titan.core.attribute.Contain;
import com.thinkaurelius.titan.core.attribute.Text;
import com.thinkaurelius.titan.graphdb.query.TitanPredicate;
import com.tinkerpop.blueprints.Compare;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery.QueryOperator;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.tinkerpop.query.NativeTinkerpopGraphQuery;
import org.apache.atlas.repository.graphdb.titan0.Titan0Edge;
import org.apache.atlas.repository.graphdb.titan0.Titan0Graph;
import org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase;
import org.apache.atlas.repository.graphdb.titan0.Titan0Vertex;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
/**
* Titan 0.5.4 implementation of NativeTitanGraphQuery.
*
* @author jeff
*
*/
public class NativeTitan0GraphQuery implements NativeTinkerpopGraphQuery<Titan0Vertex, Titan0Edge> {
private Titan0Graph graph;
private TitanGraphQuery<?> query;
public NativeTitan0GraphQuery(Titan0Graph graph) {
query = Titan0GraphDatabase.getGraphInstance().query();
this.graph = graph;
}
@Override
public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices() {
Iterable it = query.vertices();
return graph.wrapVertices(it);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges() {
Iterable it = query.edges();
return graph.wrapEdges(it);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges(int limit) {
Iterable it = query.limit(limit).edges();
return graph.wrapEdges(it);
}
@Override
public Iterable<AtlasEdge<Titan0Vertex, Titan0Edge>> edges(int offset, int limit) {
List<Edge> result = new ArrayList<>(limit);
Iterator<Edge> iter = query.limit(offset + limit).edges().iterator();
for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
if (resultIdx < offset) {
continue;
}
result.add(iter.next());
}
return graph.wrapEdges(result);
}
@Override
public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int limit) {
Iterable it = query.limit(limit).vertices();
return graph.wrapVertices(it);
}
@Override
public Iterable<AtlasVertex<Titan0Vertex, Titan0Edge>> vertices(int offset, int limit) {
List<Vertex> result = new ArrayList<>(limit);
Iterator<Vertex> iter = query.limit(offset + limit).vertices().iterator();
for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
if (resultIdx < offset) {
continue;
}
result.add(iter.next());
}
return graph.wrapVertices(result);
}
@Override
public void in(String propertyName, Collection<?> values) {
query.has(propertyName, Contain.IN, values);
}
@Override
public void has(String propertyName, QueryOperator op, Object value) {
TitanPredicate pred;
if (op instanceof ComparisionOperator) {
Compare c = getGremlinPredicate((ComparisionOperator) op);
pred = TitanPredicate.Converter.convert(c);
} else {
pred = getGremlinPredicate((MatchingOperator) op);
}
query.has(propertyName, pred, value);
}
@Override
public void orderBy(final String propertyName, final AtlasGraphQuery.SortOrder sortOrder) {
query.orderBy(propertyName, sortOrder == AtlasGraphQuery.SortOrder.ASC ? Order.ASC : Order.DESC);
}
private Text getGremlinPredicate(MatchingOperator op) {
switch (op) {
case CONTAINS:
return Text.CONTAINS;
case PREFIX:
return Text.PREFIX;
case SUFFIX:
return Text.CONTAINS_REGEX;
case REGEX:
return Text.REGEX;
default:
throw new RuntimeException("Unsupported matching operator:" + op);
}
}
private Compare getGremlinPredicate(ComparisionOperator op) {
switch (op) {
case EQUAL:
return Compare.EQUAL;
case GREATER_THAN:
return Compare.GREATER_THAN;
case GREATER_THAN_EQUAL:
return Compare.GREATER_THAN_EQUAL;
case LESS_THAN:
return Compare.LESS_THAN;
case LESS_THAN_EQUAL:
return Compare.LESS_THAN_EQUAL;
case NOT_EQUAL:
return Compare.NOT_EQUAL;
default:
throw new RuntimeException("Unsupported comparison operator:" + op);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0.query;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.tinkerpop.query.TinkerpopGraphQuery;
import org.apache.atlas.repository.graphdb.tinkerpop.query.NativeTinkerpopGraphQuery;
import org.apache.atlas.repository.graphdb.tinkerpop.query.NativeTinkerpopQueryFactory;
import org.apache.atlas.repository.graphdb.titan0.Titan0Edge;
import org.apache.atlas.repository.graphdb.titan0.Titan0Graph;
import org.apache.atlas.repository.graphdb.titan0.Titan0Vertex;
/**
* Titan 0.5.4 implementation of AtlasGraphQuery.
*/
public class Titan0GraphQuery extends TinkerpopGraphQuery<Titan0Vertex, Titan0Edge>
implements NativeTinkerpopQueryFactory<Titan0Vertex, Titan0Edge> {
public Titan0GraphQuery(Titan0Graph graph, boolean isChildQuery) {
super(graph, isChildQuery);
}
public Titan0GraphQuery(Titan0Graph graph) {
super(graph);
}
@Override
public AtlasGraphQuery<Titan0Vertex, Titan0Edge> createChildQuery() {
return new Titan0GraphQuery((Titan0Graph)graph, true);
}
@Override
protected NativeTinkerpopQueryFactory<Titan0Vertex, Titan0Edge> getQueryFactory() {
return this;
}
@Override
public NativeTinkerpopGraphQuery<Titan0Vertex, Titan0Edge> createNativeTinkerpopQuery() {
return new NativeTitan0GraphQuery((Titan0Graph)graph);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.hbase;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.fail;
import java.util.concurrent.TimeUnit;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.EntryMetaData;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator;
import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException;
import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
import com.thinkaurelius.titan.diskstorage.util.time.StandardDuration;
import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
public class HBaseKeyColumnValueStoreTest {
@Mock
HBaseStoreManager storeManager;
@Mock
ConnectionMask connectionMask;
@Mock
LocalLockMediator localLockMediator;
@Mock
StaticBuffer key;
@Mock
StaticBuffer column;
@Mock
StaticBuffer expectedValue;
@Mock
HBaseTransaction transaction;
@Mock
Configuration storageConfig;
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void shouldSucceedInLockingIfLockMediatorSucceeds() throws BackendException {
when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP});
when(storeManager.getStorageConfig()).thenReturn(storageConfig);
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn(
new StandardDuration(300L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn(
new StandardDuration(10L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3);
KeyColumn lockID = new KeyColumn(key, column);
when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))).
thenReturn(true);
HBaseKeyColumnValueStore hBaseKeyColumnValueStore =
new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e", "hbase", localLockMediator);
hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction);
verify(transaction).updateLocks(lockID, expectedValue);
verify(localLockMediator, times(1)).lock(eq(lockID), eq(transaction), any(Timepoint.class));
}
@Test
public void shouldRetryRightNumberOfTimesIfLockMediationFails() throws BackendException {
when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP});
when(storeManager.getStorageConfig()).thenReturn(storageConfig);
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn(
new StandardDuration(300L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn(
new StandardDuration(10L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3);
KeyColumn lockID = new KeyColumn(key, column);
when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))).
thenReturn(false).thenReturn(false).thenReturn(true);
HBaseKeyColumnValueStore hBaseKeyColumnValueStore =
new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e", "hbase", localLockMediator);
hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction);
verify(transaction).updateLocks(lockID, expectedValue);
verify(localLockMediator, times(3)).lock(eq(lockID), eq(transaction), any(Timepoint.class));
}
@Test(expectedExceptions = PermanentLockingException.class)
public void shouldThrowExceptionAfterConfiguredRetriesIfLockMediationFails() throws BackendException {
when(storeManager.getMetaDataSchema("hbase")).thenReturn(new EntryMetaData[] {EntryMetaData.TIMESTAMP});
when(storeManager.getStorageConfig()).thenReturn(storageConfig);
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE)).thenReturn(
new StandardDuration(300L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT)).thenReturn(
new StandardDuration(10L, TimeUnit.MILLISECONDS));
when(storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY)).thenReturn(3);
KeyColumn lockID = new KeyColumn(key, column);
when(localLockMediator.lock(eq(lockID), eq(transaction), any(Timepoint.class))).
thenReturn(false).thenReturn(false).thenReturn(false);
HBaseKeyColumnValueStore hBaseKeyColumnValueStore =
new HBaseKeyColumnValueStore(storeManager, connectionMask, "titan", "e", "hbase", localLockMediator);
hBaseKeyColumnValueStore.acquireLock(key, column, expectedValue, transaction);
fail("Should fail as lock could not be acquired after 3 retries.");
}
}
/*
* Copyright 2012-2013 Aurelius LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.thinkaurelius.titan.diskstorage.locking;
import com.thinkaurelius.titan.diskstorage.hbase.HBaseTransaction;
import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
import com.thinkaurelius.titan.diskstorage.util.time.Timestamps;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.util.KeyColumn;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
public class LocalLockMediatorTest {
private static final String LOCK_NAMESPACE = "test";
private static final StaticBuffer LOCK_ROW = StaticArrayBuffer.of(new byte[]{1});
private static final StaticBuffer LOCK_COL = StaticArrayBuffer.of(new byte[]{1});
private static final KeyColumn kc = new KeyColumn(LOCK_ROW, LOCK_COL);
private static final HBaseTransaction mockTx1 = Mockito.mock(HBaseTransaction.class);
private static final HBaseTransaction mockTx2 = Mockito.mock(HBaseTransaction.class);
@Test
public void testLock() throws InterruptedException {
TimestampProvider times = Timestamps.MICRO;
LocalLockMediator<HBaseTransaction> llm =
new LocalLockMediator<>(LOCK_NAMESPACE, times);
//Expire immediately
Assert.assertTrue(llm.lock(kc, mockTx1, times.getTime(0, TimeUnit.NANOSECONDS)));
Assert.assertTrue(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
llm = new LocalLockMediator<>(LOCK_NAMESPACE, times);
//Expire later
Assert.assertTrue(llm.lock(kc, mockTx1, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
//So second lock should fail on same keyCol
Assert.assertFalse(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
//Unlock
Assert.assertTrue(llm.unlock(kc, mockTx1));
//Now locking should succeed
Assert.assertTrue(llm.lock(kc, mockTx2, times.getTime(Long.MAX_VALUE, TimeUnit.NANOSECONDS)));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import org.apache.atlas.graph.GraphSandboxUtil;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
*
*/
public abstract class AbstractGraphDatabaseTest {
protected static final String WEIGHT_PROPERTY = "weight";
protected static final String TRAIT_NAMES = Constants.TRAIT_NAMES_PROPERTY_KEY;
protected static final String TYPE_PROPERTY_NAME = "__type";
protected static final String TYPESYSTEM = "TYPESYSTEM";
private static final String BACKING_INDEX_NAME = "backing";
private AtlasGraph<?, ?> graph = null;
@BeforeClass
public static void createIndices() {
GraphSandboxUtil.create();
Titan0GraphDatabase db = new Titan0GraphDatabase();
AtlasGraphManagement mgmt = db.getGraph().getManagementSystem();
if (mgmt.getGraphIndex(BACKING_INDEX_NAME) == null) {
mgmt.createVertexMixedIndex(BACKING_INDEX_NAME, Constants.BACKING_INDEX, Collections.emptyList());
}
mgmt.makePropertyKey("age13", Integer.class, AtlasCardinality.SINGLE);
createIndices(mgmt, "name", String.class, false, AtlasCardinality.SINGLE);
createIndices(mgmt, WEIGHT_PROPERTY, Integer.class, false, AtlasCardinality.SINGLE);
createIndices(mgmt, "size15", String.class, false, AtlasCardinality.SINGLE);
createIndices(mgmt, "typeName", String.class, false, AtlasCardinality.SINGLE);
createIndices(mgmt, "__type", String.class, false, AtlasCardinality.SINGLE);
createIndices(mgmt, Constants.GUID_PROPERTY_KEY, String.class, true, AtlasCardinality.SINGLE);
createIndices(mgmt, Constants.TRAIT_NAMES_PROPERTY_KEY, String.class, false, AtlasCardinality.SET);
createIndices(mgmt, Constants.SUPER_TYPES_PROPERTY_KEY, String.class, false, AtlasCardinality.SET);
mgmt.commit();
}
@AfterMethod
public void commitGraph() {
//force any pending actions to be committed so we can be sure they don't cause errors.
pushChangesAndFlushCache();
getGraph().commit();
}
@AfterClass
public static void cleanUp() {
Titan0Graph graph = new Titan0Graph();
graph.clear();
}
protected <V, E> void pushChangesAndFlushCache() {
getGraph().commit();
}
private static void createIndices(AtlasGraphManagement management, String propertyName, Class propertyClass,
boolean isUnique, AtlasCardinality cardinality) {
if (management.containsPropertyKey(propertyName)) {
//index was already created
return;
}
AtlasPropertyKey key = management.makePropertyKey(propertyName, propertyClass, cardinality);
try {
if (propertyClass != Integer.class) {
management.addMixedIndex(BACKING_INDEX_NAME, key);
}
} catch(Throwable t) {
//ok
t.printStackTrace();
}
try {
management.createVertexCompositeIndex(propertyName, isUnique, Collections.singletonList(key));
} catch(Throwable t) {
//ok
t.printStackTrace();
}
}
protected final <V, E> AtlasGraph<V, E> getGraph() {
if (graph == null) {
graph = new Titan0Graph();
}
return (AtlasGraph<V, E>)graph;
}
protected Titan0Graph getTitan0Graph() {
AtlasGraph g = getGraph();
return (Titan0Graph)g;
}
protected List<AtlasVertex> newVertices = new ArrayList<>();
protected final <V, E> AtlasVertex<V, E> createVertex(AtlasGraph<V, E> theGraph) {
AtlasVertex<V, E> vertex = theGraph.addVertex();
newVertices.add(vertex);
return vertex;
}
@AfterMethod
public void removeVertices() {
for(AtlasVertex vertex : newVertices) {
if (vertex.exists()) {
getGraph().removeVertex(vertex);
}
}
getGraph().commit();
newVertices.clear();
}
protected void runSynchronouslyInNewThread(final Runnable r) throws Throwable {
RunnableWrapper wrapper = new RunnableWrapper(r);
Thread th = new Thread(wrapper);
th.start();
th.join();
Throwable ex = wrapper.getExceptionThrown();
if (ex != null) {
throw ex;
}
}
private static final class RunnableWrapper implements Runnable {
private final Runnable r;
private Throwable exceptionThrown = null;
private RunnableWrapper(Runnable r) {
this.r = r;
}
@Override
public void run() {
try {
r.run();
} catch(Throwable e) {
exceptionThrown = e;
}
}
public Throwable getExceptionThrown() {
return exceptionThrown;
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.titan0;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.graph.GraphSandboxUtil;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.commons.configuration.Configuration;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@Test
public class Titan0DatabaseValidationTest {
private Configuration configuration;
private AtlasGraph<?, ?> graph;
@BeforeTest
public void setUp() throws AtlasException {
GraphSandboxUtil.create();
// First get Instance
graph = new Titan0Graph();
configuration = ApplicationProperties.getSubsetConfiguration(ApplicationProperties.get(),
Titan0GraphDatabase.GRAPH_PREFIX);
}
@AfterClass
public void tearDown() throws Exception {
try {
graph.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
try {
graph.clear();
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testValidate() throws AtlasException {
try {
Titan0GraphDatabase.validateIndexBackend(configuration);
} catch (Exception e) {
Assert.fail("Unexpected exception ", e);
}
// Change backend
configuration.setProperty(Titan0GraphDatabase.INDEX_BACKEND_CONF, Titan0GraphDatabase.INDEX_BACKEND_LUCENE);
try {
Titan0GraphDatabase.validateIndexBackend(configuration);
Assert.fail("Expected exception");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(),
"Configured Index Backend lucene differs from earlier configured "
+ "Index Backend elasticsearch. Aborting!");
}
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######### Graph Database to Use #########
atlas.graphdb.backend=org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase
######### Atlas Server Configs #########
atlas.rest.address=http://localhost:31000
######### Graph Database Configs #########
# Graph Storage
atlas.graph.storage.backend=${graph.storage.backend}
# Graph Search Index Backend
atlas.graph.index.search.backend=${graph.index.backend}
#Berkeley storage directory
atlas.graph.storage.directory=${sys:atlas.data}/berkeley
#hbase
#For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1
atlas.graph.storage.lock.wait-time=10000
#ElasticSearch
atlas.graph.index.search.directory=${sys:atlas.data}/es
atlas.graph.index.search.elasticsearch.client-only=false
atlas.graph.index.search.elasticsearch.local-mode=true
atlas.graph.index.search.elasticsearch.create.sleep=2000
# Solr cloud mode properties
atlas.graph.index.search.solr.mode=cloud
atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
# Solr-specific configuration property
atlas.graph.index.search.max-result-set-size=150
######### Hive Lineage Configs #########
# This models reflects the base super types for Data and Process
#atlas.lineage.hive.table.type.name=DataSet
#atlas.lineage.hive.process.type.name=Process
#atlas.lineage.hive.process.inputs.name=inputs
#atlas.lineage.hive.process.outputs.name=outputs
## Schema
atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
######### Notification Configs #########
atlas.notification.embedded=true
atlas.kafka.zookeeper.connect=localhost:19026
atlas.kafka.bootstrap.servers=localhost:19027
atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.consumer.timeout.ms=100
atlas.kafka.auto.commit.interval.ms=100
atlas.kafka.hook.group.id=atlas
atlas.kafka.entities.group.id=atlas_entities
atlas.kafka.offsets.topic.replication.factor=1
######### Entity Audit Configs #########
atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
atlas.audit.zookeeper.session.timeout.ms=1000
atlas.audit.hbase.zookeeper.quorum=localhost
atlas.audit.hbase.zookeeper.property.clientPort=19026
######### Security Properties #########
# SSL config
atlas.enableTLS=false
atlas.server.https.port=31443
######### Security Properties #########
hbase.security.authentication=simple
atlas.hook.falcon.synchronous=true
######### High Availability Configuration ########
atlas.server.ha.enabled=false
#atlas.server.ids=id1
#atlas.server.address.id1=localhost:21000
...@@ -72,7 +72,7 @@ ...@@ -72,7 +72,7 @@
<appender-ref ref="AUDIT"/> <appender-ref ref="AUDIT"/>
</logger> </logger>
<logger name="com.thinkaurelius.titan" additivity="false"> <logger name="org.janusgraph" additivity="false">
<level value="warn"/> <level value="warn"/>
<appender-ref ref="console"/> <appender-ref ref="console"/>
</logger> </logger>
......
...@@ -57,7 +57,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley ...@@ -57,7 +57,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
......
...@@ -650,10 +650,9 @@ ...@@ -650,10 +650,9 @@
These profiles are mutually exclusive and should be activated by setting the system These profiles are mutually exclusive and should be activated by setting the system
property GRAPH-PROVIDER. property GRAPH-PROVIDER.
This can be optionally specified when invoking mvn: This can be optionally specified when invoking mvn:
e.g. mvn clean install -DGRAPH-PROVIDER=titan0 e.g. mvn clean install -DGRAPH-PROVIDER=janus
The settings for GRAPH-PROVIDER have the following effects: The settings for GRAPH-PROVIDER have the following effects:
* If GRAPH-PROVIDER is not specified, the graph-provider-default profile is activated. * If GRAPH-PROVIDER is not specified, the graph-provider-default profile (janus) is activated.
* If GRAPH-PROVIDER is set to titan0, the graph-provider-titan0 profile is activated.
* If GRAPH-PROVIDER is set to anything else, the build will fail. * If GRAPH-PROVIDER is set to anything else, the build will fail.
Do not activate the graph-provider selection profiles using -P. Do not activate the graph-provider selection profiles using -P.
--> -->
...@@ -667,7 +666,6 @@ ...@@ -667,7 +666,6 @@
</property> </property>
</activation> </activation>
<properties> <properties>
<!-- Define graph dependency type/version -->
<graphGroup>org.apache.atlas</graphGroup> <graphGroup>org.apache.atlas</graphGroup>
<graphArtifact>atlas-graphdb-janus</graphArtifact> <graphArtifact>atlas-graphdb-janus</graphArtifact>
<skipDocs>false</skipDocs> <skipDocs>false</skipDocs>
...@@ -687,7 +685,6 @@ ...@@ -687,7 +685,6 @@
</property> </property>
</activation> </activation>
<properties> <properties>
<!-- Define graph dependency type/version -->
<graphGroup>org.apache.atlas</graphGroup> <graphGroup>org.apache.atlas</graphGroup>
<graphArtifact>atlas-graphdb-janus</graphArtifact> <graphArtifact>atlas-graphdb-janus</graphArtifact>
<skipDocs>false</skipDocs> <skipDocs>false</skipDocs>
...@@ -699,26 +696,6 @@ ...@@ -699,26 +696,6 @@
</profile> </profile>
<profile> <profile>
<id>graph-provider-titan0</id>
<activation>
<property>
<name>GRAPH-PROVIDER</name>
<value>titan0</value>
</property>
</activation>
<properties>
<!-- Define graph dependency type/version -->
<graphGroup>org.apache.atlas</graphGroup>
<graphArtifact>atlas-graphdb-titan0</graphArtifact>
<skipDocs>false</skipDocs>
<graphdb.backend.impl>org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase</graphdb.backend.impl>
<graph.index.backend>elasticsearch</graph.index.backend>
<tests.solr.embedded>false</tests.solr.embedded>
<distro.exclude.packages>WEB-INF/lib/titan-*.jar,WEB-INF/lib/je-*.jar,WEB-INF/lib/elasticsearch-*.jar,WEB-INF/lib/lucene-*.jar</distro.exclude.packages>
</properties>
</profile>
<profile>
<id>skipMinify</id> <id>skipMinify</id>
<properties> <properties>
<project.build.dashboardv2.gruntBuild>build</project.build.dashboardv2.gruntBuild> <project.build.dashboardv2.gruntBuild>build</project.build.dashboardv2.gruntBuild>
......
...@@ -251,30 +251,6 @@ ...@@ -251,30 +251,6 @@
</dependency> </dependency>
</dependencies> </dependencies>
</profile> </profile>
<profile>
<id>graph-provider-titan0</id>
<activation>
<property>
<name>GRAPH-PROVIDER</name>
<value>titan0</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-testtools</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
</profiles> </profiles>
<build> <build>
......
...@@ -116,7 +116,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -116,7 +116,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
} }
/** /**
* Initialize global indices for Titan graph on server activation. * Initialize global indices for JanusGraph on server activation.
* *
* Since the indices are shared state, we need to do this only from an active instance. * Since the indices are shared state, we need to do this only from an active instance.
*/ */
......
...@@ -761,7 +761,7 @@ public abstract class DeleteHandlerV1 { ...@@ -761,7 +761,7 @@ public abstract class DeleteHandlerV1 {
List<String> elements = GraphHelper.getListProperty(outVertex, propertyName); List<String> elements = GraphHelper.getListProperty(outVertex, propertyName);
if (elements != null) { if (elements != null) {
elements = new ArrayList<>(elements); //Make a copy, else list.remove reflects on titan.getProperty() elements = new ArrayList<>(elements);
for (String elementEdgeId : elements) { for (String elementEdgeId : elements) {
AtlasEdge elementEdge = graphHelper.getEdgeByEdgeId(outVertex, edgeLabel, elementEdgeId); AtlasEdge elementEdge = graphHelper.getEdgeByEdgeId(outVertex, edgeLabel, elementEdgeId);
...@@ -807,7 +807,7 @@ public abstract class DeleteHandlerV1 { ...@@ -807,7 +807,7 @@ public abstract class DeleteHandlerV1 {
List<String> keys = GraphHelper.getListProperty(outVertex, propertyName); List<String> keys = GraphHelper.getListProperty(outVertex, propertyName);
if (keys != null) { if (keys != null) {
keys = new ArrayList<>(keys); //Make a copy, else list.remove reflects on titan.getProperty() keys = new ArrayList<>(keys);
for (String key : keys) { for (String key : keys) {
String keyPropertyName = GraphHelper.getQualifiedNameForMapKey(propertyName, key); String keyPropertyName = GraphHelper.getQualifiedNameForMapKey(propertyName, key);
......
...@@ -61,15 +61,5 @@ ...@@ -61,15 +61,5 @@
<artifactId>blueprints-core</artifactId> <artifactId>blueprints-core</artifactId>
<version>${tinkerpop.version}</version> <version>${tinkerpop.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-core</artifactId>
<version>${titan.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-graphdb-titan0</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -18,12 +18,7 @@ ...@@ -18,12 +18,7 @@
package org.apache.atlas.migration; package org.apache.atlas.migration;
import com.thinkaurelius.titan.core.TitanGraph;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONWriter;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graphdb.titan0.Titan0GraphDatabase;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.BasicParser;
...@@ -143,10 +138,6 @@ public class Exporter { ...@@ -143,10 +138,6 @@ public class Exporter {
try { try {
os = new FileOutputStream(dataFileName); os = new FileOutputStream(dataFileName);
Graph graph = getTitan0GraphDatabase();
GraphSONWriter.outputGraph(graph, os, GraphSONMode.EXTENDED);
} finally { } finally {
if (os != null) { if (os != null) {
try { try {
...@@ -167,10 +158,6 @@ public class Exporter { ...@@ -167,10 +158,6 @@ public class Exporter {
new ArrayList<>(registry.getAllEntityDefs())); new ArrayList<>(registry.getAllEntityDefs()));
} }
private TitanGraph getTitan0GraphDatabase() {
return Titan0GraphDatabase.getGraphInstance();
}
private static void displayMessage(String msg) { private static void displayMessage(String msg) {
LOG.info(LOG_MSG_PREFIX + msg); LOG.info(LOG_MSG_PREFIX + msg);
......
...@@ -460,9 +460,9 @@ ...@@ -460,9 +460,9 @@
</manifest> </manifest>
</archive> </archive>
<packagingExcludes> <packagingExcludes>
<!-- Titan and hbase jars should be excluded because an uber jar with shaded dependencies is created. <!-- HBase jars should be excluded because an uber jar with shaded dependencies is created.
But mvn 3.3.x includes them for some reason. So, excluding them explicitly here --> But mvn 3.3.x includes them for some reason. So, excluding them explicitly here -->
WEB-INF/lib/titan*.jar,WEB-INF/lib/hbase*.jar,WEB-INF/lib/junit*.jar,${packages.to.exclude} WEB-INF/lib/hbase*.jar,WEB-INF/lib/junit*.jar,${packages.to.exclude}
</packagingExcludes> </packagingExcludes>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley ...@@ -39,7 +39,7 @@ atlas.graph.storage.directory=${sys:atlas.data}/berkley
#hbase #hbase
#For standalone mode , specify localhost #For standalone mode , specify localhost
#for distributed mode, specify zookeeper quorum here - For more information refer http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 #for distributed mode, specify zookeeper quorum here
atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hostname=${graph.storage.hostname}
atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.hbase.regions-per-server=1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment