About the Cassandra CQL Resource Adaptor
The Cassandra CQL Resource Adaptor allows applications in the Rhino TAS to execute queries against a Cassandra database using the latest version of the cassandra binary protocol. The resource adaptor is built on top of the DataStax java cassandra driver.
For more information about the DataStax java driver see See: https://docs.datastax.com/en/developer/java-driver/3.4/ |
Topics
This document includes the following topics:
Topic | Explains… |
---|---|
The structure of the Cassandra CQL Resource Adaptor |
|
How to configure a Cassandra CQL Resource Adaptor entity |
|
How to monitor the behaviour of a Cassandra CQL Resource Adaptor entity |
|
How to use the Cassandra CQL resource adaptor api in an application |
Resource Adaptor Architecture
The following diagram presents a high level overview of the Cassandra CQL resource adaptor and highlights the most important components.
The most important components from a Rhino TAS application developer perspective are:
-
CassandraProvider
— allows application developers to execute synchronous queries against the cassandra database and createCassandraSession
activities that support asynchronous interaction -
CassandraSession
— is the activity type for the Cassandra CQL resource adaptor. Application developers use aCassandraSession
to execute queries asynchronously against a cassandra database and receive the results as events (in a future transaction) -
QueryResultEvent
andCassandraErrorEvent
— are the event types provided by the Cassandra CQL resource adaptor. These events represent the results of an asynchronous interaction with the cassandra database and are delivered within the context of aCassandraSession
See Using the Cassandra CQL API for more information on these components, including examples of how to use the Cassandra CQL resource adaptor api in an application
You monitor the health of a Cassandra CQL resource adaptor entity by inspecting the statistics the entity generates and responding to alarms it raises. See Monitoring the resource adaptor for more information on the statistics collected and the alarms raised.
You can define your own threshhold based alarms that are a function of the statistics the Cassandra CQL resource adaptor collects. See Threshold Alarms in the Rhino TAS Admin Guide for more information about defining threshold based alarms. |
The resource adaptor contains a cache of prepared statements. A prepared statement is a query that has been pre-parsed and validated by the database. The resource adaptor caches the result of prepare operations so the application developer does not need to implement their own caching behaviour. You can configure the cache size and how long cache entries are preserved, to suit your application needs.
The resource adaptor includes an executor component that is responsible for receiving asynchronous responses from the DataStax java driver, and injecting corresponding events into the Rhino TAS (that are then delivered to applications for processing). You can configure the threading and queuing behaviour of the executor to suit your application needs.
General properties of the resource adaptor (such as the hosts of the cassandra cluster, and the keyspace to use) can be configured as required to suit your application needs.
Configuring the resource adaptor
Cassandra CQL resource adaptor configuration
The Cassandra CQL resource adaptor has configuration properties related to:
-
The Cassandra database to connect to — contact points and keyspace
-
Configuration options of the Cassandra cluster — protocol version, port, query consistency level and so on
-
Connecting to the Cassandra database — policies for connecting and re-connecting to the database
-
Socket options — socket options passed to the DataStax driver
-
Load balancing options — a subset of the available load balancing options supported by the DataStax driver
-
Cache of prepared statements — cache size -and entry expiry- configuration (entry expiry is now obsolete)
-
Processing asynchronous responses — threads used for processing asynchronous responses and submitting events to the SLEE
-
Optional switches related to debugging — options that may be useful for debugging
The Cassandra database to connect to
These configuration options define the initial set of hosts the resource adaptor entity should connect to and the keyspace it should use. There is one keyspace per resource adaptor entity. If your application requires several keyspaces, then you must create one resource adaptor entity for each keyspace. These properties may be actively changed (i.e whilst the ra entity is active).
Name | Type | Default | Description |
---|---|---|---|
|
|
|
Comma separated list of hostname/ip addresses of cassandra nodes used to discover the cluster topology. The default value is 'localhost' |
|
|
The keyspace this RA entity should 'connect' to. There is no default, so this property must be defined. |
Contact points are addresses of Cassandra nodes that the resource adaptor uses to discover the cluster topology. Only one contact point is required (the resource adaptor will retrieve the address of the other nodes automatically). It is recommended to use more than one contact point, because if that single contact point is unavailable, the resource adaptor cannot connect to the cluster. |
Configuration options related to the Cassandra cluster
These configuration options define general properties of the connection made to the Cassandra cluster. They may be actively changed (i.e whilst the ra entity is active) and will have an impact on the next time the resource adaptor needs to re-connect to the cassandra cluster.
Name | Type | Default | Description |
---|---|---|---|
|
|
|
The binary protocol version to use. Possible values are: |
|
|
|
The compression method to use with the binary protocol. Possible values are:
|
|
|
|
If SSL should be used (using jdk built-in engine). |
|
|
|
If SSL should be used (defer to the Netty layer). |
|
|
|
The port to connect to the cassandra database |
|
|
|
The default query consistency level. The default value is |
|
|
|
The default serial consistency level for conditional updates. The default value is |
|
|
|
The default fetch size (the number of rows) to use for SELECT queries. Allowable values are |
|
|
|
The default timeout (in milliseconds) for synchronous queries if none is supplied. Allowable values are |
|
|
|
The timeout (in milliseconds) for the preparation of queries. Allowable values are |
The configuration options |
Possible consistency levels (from Apache Cassandra™ 2.0 — Configuring data consistency) are:
For more details, refer to Cassandra documentation: |
Connecting to the Cassandra database
These properties dictate how the Cassandra CQL resource adaptor directs the DataStax driver to connect to the Cassandra cluster. These properties may be actively changed (i.e whilst the ra entity is active) and will have an impact on the next time the resource adaptor needs to re-connect to the cassandra cluster.
Name | Type | Default | Description |
---|---|---|---|
|
|
|
The two policies available for connection/re-connection (
|
|
|
|
How long (s) between attempts to connect to the Cassandra DB (constant). Acceptable values are 5s to 60s. |
|
|
|
The first delay (s) before attempting to connect to the Cassandra DB (exponential). Acceptable values are 1s to 24s. |
|
|
|
Subsequent delay (s) = 2 x previous delay, capped by max delay (s) (exponential). Acceptable values are 16s to 128s. |
The |
Socket options
These properties correspond to socket options of the DataStax driver.
Name | Type | Default | Description |
---|---|---|---|
|
|
|
How long (ms) to establish a new connection to a Cassandra node before giving up. |
|
|
|
How long (ms) for a given Cassandra node to answer a query. |
|
|
|
True/False/not-set. Whether to enable TCP keepalive. The default, (not-set), means the value used is the default from the underlying Netty transport. |
|
|
|
True/False/not-set. Whether reuse-address is enabled. The default, (not-set), means the value used is the default from the underlying Netty transport. |
|
|
|
The linger-on-close timeout. The default, (-1 == not set), means the value used is the default from the underlying Netty transport. |
|
|
|
Whether to disable Nagle’s algorithm. |
|
|
|
Sets a hint to the size of the underlying buffers for incoming network I/O. The default, (-1 == not set), means the value used is the default from the underlying Netty transport. |
|
|
|
Sets a hint to the size of the underlying buffers for outgoing network I/O. The default, (-1 == not set), means the value used is the default from the underlying Netty transport. |
Setting 'socket.solinger' to 0 disables SO_Linger and is not recommended |
Load balancing options
These properties correspond to a subset of all available load balancing options supported by the DataStax driver.
Name | Type | Default | Description |
---|---|---|---|
|
|
|
Select the load-balancing approach to use. Allowed values include: round-robin, dc-aware-round-robin, latency-aware-round-robin, latency-aware-dc-aware-round-robin. A value of use-default uses the driver default. |
|
|
|
How much worse the average latency of a node must be compared to the fastest performing node for it to be penalized. Must be greater than 1.0. A value of 0 means use the driver default. |
|
|
|
Dictates the weight of a latency measurement to previous measurements in average latency. The value must be greater than 0. A value of 0 means use the driver default. |
The following recipes are supported are:
A value of 'use-default' means the DataStax driver default load balancing configuration will be used. The latency-aware load balancing policy collect latencies of the queries to each Cassandra node and maintain a per-node average latency score. The nodes that are slower than the best performing node by more than a configurable threshold will be moved to the end of the query plan (that is, they will only be tried if all other nodes failed). The latency-aware configuration properties include:
|
Cache of prepared statements
The resource adaptor manages a cache of prepared statements, implemented as a LoadingCache<String, PreparedCassandraStatement>
from the Guava library (guava-libraries — CachesExplained).
See section Preparing statements for more information on preparing statements using the Cassandra CQL resource adaptor API. |
The cache is cleared when the resource adaptor entity is deactivated. The size of the cache, -and the cache entry expiry behaviour- is controlled by the statementCache.maxSize
and -statementCache.expireAfterAccessT
- properties. These properties may be actively changed (i.e whilst the ra entity is active). Entries in the cache are preserved when properties of the cache are changed.
The configuration option |
Name | Type | Default | Description |
---|---|---|---|
|
|
|
Maximum number of entries in the prepared statement cache |
|
|
|
Obsolete. -Time (s) after which a statement is removed from the cache since it was last used- |
Processing asynchronous responses
The resource adaptor receives asynchronous responses from the DataStax driver and delivers them as events into the Rhino TAS. This function is implemented using a standard java ThreadPoolExecutor
. The ThreadPoolExecutor
encapsulates a thread pool and a task queue. The following properties define the configuration of this ThreadPoolExecutor
and should be set to suit the expected rate of asynchrounous responses.
Name | Type | Default | Description |
---|---|---|---|
|
|
|
Number of threads available at all times |
|
|
|
Maximum number of threads |
|
|
|
How long (s) to keep non-core threads if they are idle |
|
|
|
The maximum results to queue waiting for a result executor thread |
|
|
|
If the RA guard timer should be used for asynchronous queries |
|
|
|
Milliseconds between scheduler ticks |
|
|
|
Common timer interval in milliseconds |
|
|
|
Maximum timer interval in milliseconds |
These properties may be actively changed (i.e whilst the ra entity is active).
Optional switches for debugging
The resource adaptor will request additional notifications from the Rhino TAS when CassandraSession
activities end if the debug.activity.requestendedcallback
is true
.
The resource adaptor will set the enableTracing
flag on all queries that take a Statement
arg if the debug.activity.tracingenabled
is true
.
This behaviour is not required for normal operation of the resource adaptor, but may be useful during application testing scenarios. These properties can be actively changed (i.e whilst the ra entity is active).
Name | Type | Default | Description |
---|---|---|---|
|
|
|
Should the RA request activity ended notification from the slee |
|
|
|
Should the RA call enableTracing on all Cassandra Statements. |
Monitoring the resource adaptor
Statistics
The Cassandra CQL resource adaptor updates the default parameter set. You can monitor the health of your Cassandra CQL resource adaptor entity using the Rhino statistics tools. For example. to monitor the stats from the command line (<entity-name>
is the name of your resource adaptor entity):
rhino-stats -m SLEE-Usage.RAEntities.<entity-name>.(default)
The statistics collected by the Cassandra CQL resource adaptor are:
Statistic | Type | Description |
---|---|---|
ClusterConnectionDown |
Counter |
Connection to the cluster is down |
ClusterConnected |
Counter |
Connected to the cluster |
ConnectAttempts |
Counter |
An attempt to connect to the cassandra cluster |
FailedToStartActivity |
Counter |
Rhino TAS did not allow us to start an activity |
FailedToSubmitEvent |
Counter |
Rhino TAS did not allow us to submit an event for delivery |
SynchronousQueryAttempts |
Counter |
A synchronous query is made against cassandra |
SuccessfulSynchronousQuery |
Counter |
A synchronous query made against cassandra is successful |
FailedSynchronousQuery |
Counter |
A synchronous query made against cassandra failed |
TimedOutSynchronousQuery |
Counter |
A synchronous query made against cassandra timed out |
FailedToPrepareStatement |
Counter |
A synchronous attempt to prepare a query failed |
ASynchronousQueryAttempts |
Counter |
An asynchronous query is made against cassandra |
SuccessfulASynchronousQuery |
Counter |
An asynchronous query made against cassandra is successful |
FailedASynchronousQuery |
Counter |
An asynchronous query made against cassandra failed |
TimedOutASynchronousQuery |
Counter |
An asynchronous query made against cassandra timed out |
SynchronousQueryDuration |
Sample |
A sample statistic of the duration of a query (ns) against cassandra |
StatementPrepareDuration |
Sample |
A sample statistic of the time taken (ns) to prepare a statement against cassandra |
You can define your own threshhold alarms that are a function of the statistics the Cassandra CQL resource adaptor collects. See Threshold Alarms in the Rhino TAS Admin Guide for more information about defining threshold based alarms.
Alarms
The following alarms may be raised with a source of OpenCloud <entity-name> <version>
(for a resource adaptor entity <entity-name>
, of version <version>
).
Category | Level | Alarm Type | Message |
---|---|---|---|
CassandraCQLRA |
CRITICAL |
CassandraCQLRA.ConnectToCluster |
"Not connected to cassandra. Attempting to connect each %ds" |
RA Framework |
WARNING |
active-reconfiguration |
"Updates to %s "${instance}" will not take effect until the RA entity is restarted." |
RA Framework |
CRITICAL |
misconfiguration |
"RA configuration error, operational functionality disabled. Update the RA entity with a valid configuration to resolve." |
RA Framework |
MINOR |
misconfiguration |
"RA configuration update failed. Continuing operational functionality using the last valid configuration." |
RA Framework |
MINOR |
update-ignored |
"Update to %s "${instance}" failed, continuing with its last valid configuration" |
Using the Cassandra CQL API
Overview of the Cassandra CQL API
Class/Interface | Description |
---|---|
|
Resource adaptor provider interface used to:
|
|
SLEE activity type used to:
There are also convenience operations to:
|
|
Represents an executable cassandra query. This is the super-type for |
|
A statement that has been 'prepared' by the cassandra database. Such statements have been pre-parsed and validated by the database. A prepared statement can be executed once concrete values have been provided for the bind variables (to make a bound statement). A prepared statement also allows you to define default values for properties such as the Consistency level or tracing. Such default values are used in any bound statement created from the prepared statement. |
|
A prepared statement for which all bind variables have been bound to values. |
|
A group of statements that will be executed together as a 'batch' by the cassandra database. |
|
The result of a query against the cassandra database. Used to:
|
|
A row in a result set. Used to get values in the row by position, or name. |
Examples
Getting the Cassandra CQL provider
You get the Cassandra CQL provider object from JNDI
. For example, consider a service with the following @RATypeBinding
:
@RATypeBinding(
raType = @ComponentId(name = "cassandra-cql-ratype",
vendor = "OpenCloud",
version = "1.0.0"),
activityContextInterfaceFactoryName =
"cassandra-cql-ra/activitycontextinterfacefactory",
resourceAdaptorEntityLink = "cassandra-cql-ra",
resourceAdaptorObjectName = "cassandra-cql-ra/provider"
)
Then an SBB
might include code such as the following in setSbbContext()
.
@Override
public void setSbbContext(SbbContext context) {
try {
final Context env = (Context) new InitialContext().lookup("java:comp/env");
this.cassandraCQLProvider =
(CassandraCQLProvider) env.lookup("cassandra-cql-ra/provider");
this.cassandraCQLACIFactory =
(CassandraCQLActivityContextInterfaceFactory)
env.lookup("cassandra-cql-ra/activitycontextinterfacefactory");
}
catch (NamingException e) {
throw new RuntimeException("Failed Cassandra provider + ACI factory lookup", e);
}
}
Preparing statements
A prepared statement is a cassandra query that has been pre-parsed and validated by the cassandra database. Ideally you prepare a query once, and then use it many times by binding values to binding variables in the query. The cassandra CQL RA manages a cache of prepared statements so you do not need to implement any type of caching yourself. This saves compute resources on the database, avoids needless network interaction with the database and minimises the time the application will block and wait during prepare.
final String query = "SELECT * FROM users WHERE lastname='Page'";
final PreparedCassandraStatement preparedQuery = cassandraCQLProvider.prepare(query);
You may also prepare statements via the prepare operation of the |
Binding values to variables
A bound statement is a prepared statement with values bound to the bind variables. The values of a bound statement can be set by either index or name. If multiple bind variables have the same name, setting that name will set all the variables for that name.
All the variables of the statement must be bound before it can be executed. If you don’t explicitly set a value for a variable, a CassandraException
will be thrown when submitting the statement. If you want to set a variable to null
, use the setToNull
operation.
There are two options for creating a bound statement from a prepared statement:
-
Use
bind()
final BoundCassandraStatement boundStatement = preparedStatement.bind();
-
Use
bind(Object… values)
final BoundCassandraStatement boundStatement = preparedStatement.bind(firstValue, secondValue, thirdValue);
Use the set
operations of BoundCassandraStatement
to bind any remaining variables to values. There are two variants of each set
operation, one which takes an index, and one which takes a name.
boundStatement.setString("firstname", "David").setString("lastname", "Page");
Use the |
Using a batch statement
A batch statement is a group of queries that you wish to execute together in the cassandra database. Typically these queries are related, from a business logic perspective.
There are two steps to using a batch statement:
-
Create a batch statement with the
createBatchStatement
operation of the provider.// create a new batch statement final BatchCassandraStatement batchStatement = cassandraCQLProvider.createBatchStatement()
-
Add queries to the batch statement with the
add
andaddAll
operations.// add statements to the batch statement batchStatement.add(firstBoundStatement); batchStatement.addAll(boundStatementList);
The options of the added statement such as consistency level, fetch size, tracing, will be ignored for the purpose of the execution of the Batch. Instead, the options used are the ones defined by the batch statement.
Once the batch statement is built, you can then execute it synchronously or asynchronously.
Executing synchronous queries
Queries are executed synchronously by using operations on the provider. There are three variants:
-
Execute a query
CassandraResultSet execute(String query) throws CassandraException;
-
Execute a query, with arguments
CassandraResultSet execute(String query, Object... values) throws CassandraException;
-
Execute a statement
CassandraResultSet execute(CassandraStatement st) throws CassandraException;
There is an overloaded version of each operation that take timeout arguments as well. The resource adaptor will throw a CassandraException
if execute blocks longer than the timeout. For example:
CassandraResultSet execute(long timeout, TimeUnit units, String query) throws CassandraException;
All of the non-timeout variants use the |
Creating a CassandraSession for asynchronous queries
The result of an asynchronous query, or a request for additional results in a result set, are received as events that are associated with an activity called the CassandraSession
. You use the Cassandra CQL provider to create a new session. You use an ActivityContextInterface
factory to get the ActivityContextInterface
associated with the activity so you can attach to it, and thereafter receive any events associated with the activity. See the following example.
final CassandraSession session = cassandraCQLProvider.newSession();
final ActivityContextInterface sessionACI =
cassandraCQLACIFactory.getActivityContextInterface(session);
sessionACI.attach(context.getSbbLocalObject());
The Cassandra CQL provider will always return a CassandraSession
object. In an overload or an error situation the CassaandraSession
returned may not be valid and will fail any requests you make by throwing a CassandraException
. You can test if the CassandraSession
is valid with the isValid()
operation.
final CassandraSession session = cassandraCQLProvider.newSession();
if (session.isValid()) {
// if this is a valid 'session' then attach to the ACI.
ActivityContextInterface sessionACI =
cassandraCQLACIFactory.getActivityContextInterface(session);
sessionACI.attach(context.getSbbLocalObject());
}
Consider incrementing a statistic to record such an error. You could also define a threshold alarm that is a function of such a statistic. |
Executing asynchronous queries
Queries are executed asynchronously by using operations on CassandraSession
. There are three variants:
-
Execute a query
void execute(String query) throws CassandraException;
-
Execute a query, with arguments
void execute(String query, Object... values) throws CassandraException;
-
Execute a statement
void execute(CassandraStatement st) throws CassandraException;
The result (a CassandraResultSet
) will be received in a future SLEE transaction as a QueryResultEvent
.
There is an overloaded version of each operation that take timeout arguments as well. The resource adaptor will throw a CassandraException
if future does not return a result within the timeout. For example:
CassandraResultSet execute(long timeout, TimeUnit units, String query) throws CassandraException;
All of the non-timeout variants use the |
Getting all the results in a result set
If a query will result in a large number of rows in the result set, then the results will be received as a number of pages.
You can control the fetch size on a per query basis by using the setFetchSize() operation on CassandraStatement |
You request the next page of results in a result set by using the fetchMoreResults()
operation of CassandraResultSet
. The next page of results will be received in a subsequent SLEE transaction as a QueryResultEvent
(event if the initial query was executed synchronously).
public interface CassandraResultSet extends Iterable<CassandraRow> {
// ...
/** * Fetch the next page of results in the result set. * @param session session upon which subsequent results will be delivered * @return true if more results will be fetched, * false if all results have already been fetched * @throws CassandraException if there is a failure requesting more results */
boolean fetchMoreResults(CassandraSession session) throws CassandraException;
// ...
}
Use the isFullyFetched() operation of CassandraResultSet to test if all results in a result set have been fetched. |