Posts Tagged ‘Apache Cassandra’

Cassandra pagination has been the subject of several blogs elsewhere as well. In particular this excellent blog by Michael Kopp details how this can be generally handled using Cassandra API. We also had a our own use case where we needed paginated access. However for our use case scheme outlined by Micheal presents several shortcomings.

1. What if we want to fetch rows batch wise instead of columns?

2. If there are updates during the paged retrieval there is a chance that some items will be missed out. For example let’s say the last access is at column with column key with “Florence”. So the next retrieval would fetch a batch starting from “Florence” on wards. What if a column with key “Cologne” has been newly added? It would not get included in any of the future retrieval.

3. Also there may be a use case where it is required paginate the results obtained by filtering with a range query rather than fetching all the rows page wise in the column family. (Actually this was our use case)


So let’s have a look at how we took a stab at the beast, Cassandra pagination. Before that let me explain our use case fully so that it’s easier to grasp what we did and why we did it. Our use main case was to paginate the access to results returned from a range query which can cleanly expressed in SQL lingo as follows.


SELECT * FROM <column_family> WHERE <column_name_1> BETWEEN [from_1] AND [to_1] AND <column_name_2> BETWEEN [from_2] AND [to_2] .... AND <column_name_n> BETWEEN <from_n> AND <to_n>


Here each column_name is an index. (Actually a sub index of a composite index. For a description on our indexing scheme refer to my earlier blog Cassandra: Lessons Learnt) . So our use case is bit complicated in that it’s required to paginate the access of the result set obtained from a range query. Also our requirement was to fetch all the rows satisfying this criteria without missing any row provided that there would be new additions while we are retrieving rows in batches. In fact there may be a considerable time-lapse between two fetches since the retrieved data are processed using a scheduled task with configurable interval in our use case.  Additionally we had to leave the room for non batched access of the range query result as well. And of course we were not using the OrderedPartitioner. (Evils of OrderedPartitioner is well documented elsewhere. Sub optimal loadbalancing, creating hot spots etc.. ). Had we used OrderedPartitioner our life would have been bit easier since we would have been able to do a range query on the rows. But since we were using RandomPartitioner no ordering of rows using row keys can be assumed as well.


Ok that’s enough for the predicament that we were in couple of months back while faced with the task of ‘Cassandrafication’ our data layer. Hope you got the idea.. Now let’s see what we did to improve the situation.


First we had to deal with our inability to do range query on rows. Cassandra has this nice caveat, that columns of a particular row is always sorted using the column keys. So we utilized this nicety to impose an ordering on rows. We always maintain a meta row in which all the row keys are stored as columns. (Actually a row key is a column key and column value is empty). Let’s say this row is ‘RowIndex’. (See figure 1). Now when doing a query on column family we first query this row using a range query and get the rows matching the criteria and then do the real row fetching one by one using the row keys fetched. You might be wondering how the range query is constructed to match the where clauses in the given SQL above. In our scheme the row key is constituted from concatenating the value for each index. (Index is in fact a column in a particular row and we use the column value as the index value. This will become clearer by having a look at the first step of illustration given in figure 2). So this is the scheme we used for non batched retrieval of rows satisfying a particular query.


Figure 1 : Column family with meta row ‘RowIndex’


But for paginated use case this proved to be insufficient due to the second shortcoming outlined earlier. We realized that there needs to be an ordering from the timestamp to catch a newly added row even if its row key put it in a location in sorted order which is before the last accessed row key. So we introduced another meta row storing the timestamp of insertion of each row. Let’s say this row key of this meta row is ‘TimeStampIndex’. Each column of this row will hold the insertion timestamp as the column key and the corresponding row key of the row inserted at that particular timestamp as the column value. So now we need to do four things we add a row to the column family.


Figure 2 : Row insertion algorithm

1. Create the row key using the defined indexes. Here we use ‘server’ and ‘time’ as the indexes.

2. Insert row key in to the ‘RowIndex’ as a column.

3. Insert the row insertion timestamp along with row key as a column to the ‘TimeStampIndex’

4. Add the row itself to the column family.


‘RowIndex’ is to be used for non batched access of the range query result while ‘TimeStampIndex’ is to be used for batched access of the range query result.


Now when we want to fetch the rows in batches satisfying the range query criteria, first we get a batch size chunk of timestamps from ‘TimeStampIndex’. Then for each and every row associated with the timestamp we check whether if the row matches the filter criteria. This is a simple string comparison to check whether the row key falls between the range first and range last values.


Say for example the filter criteria for above illustration is following where clause.

WHERE 'server' BETWEEN 'esb' and 'esb' and 'hour' BETWEEN '08:00' and '09:00'

Now the range first value of the query would be ‘esb—08:00’ and the range last value would be ‘esb—09:00’. This will select events for server ‘esb’ during the hours from ’08:00′ to ’09:00′.  So if the row key is ‘esb—08:23’ it will get picked and if it is ‘esb—09:23’ it won’t.


So as can be seen for this scenario we didn’t use ‘RowIndex’ meta row. It’s for non batched use only. And in this way using ‘TimeStampIndex’ we can catch newly added rows without missing out on any row.


However it’s not without its own drawbacks.

1. The batch size is not consistent. Even though the batch size chunk is picked from the query some of these rows will be discarded since they do not match the filtering criteria. Solution would be to get multiple batches until the batch size number of rows fulfilling the filter criteria is found. But for now we are ok with inconsistent batch sizes.

2. What if an existing row is updated? It will get fetched a second time since the algorithm will not miss any newly added or updated row. This may or may not be desirable according to the use case. For us this is in fact the needed behavior since we need any new updates to an already fetched row. So we are ok with that too.


So that concludes our escapade with Cassandra pagination. The (love) story continues.. (Hope you saw the sarcasm sign unlike Sheldon.. :))

Read Full Post »

After working with Cassandra for couple of months I thought of jotting down my Cassandra experience including what we did, what I liked and not so liked. But I need to put a little disclaimer up front. We are currently using Cassandra 0.7  (“What !#$*?? Have you been living under a rock? Get your self a life and upgrade your darn installation” you would say. I know. I know.. :). We are working on it.). So some of the stuff mentioned here would not relate to later versions although the concepts we used are still mostly applicable. (I think kudos should go to Cassandra people for churning out several quick releases with some cool new features.)

That being said let me go through what we have been doing with Cassandra. Our use case is roughly as follows. We are using Cassandra as the persistence layer of our Business Activity Monitoring solution at WSO2 which is rewritten to scale better (About which I am hoping to blog some where in the line of “A scalable big data ready BAM solution – WSO2 BAM” some time soon.). The data are pumped from servers being monitored as events consisting of key value pairs and get written to Cassandra store after some minimal required processing at the receiver.

Figure 1 : Event format

These data are then indexed to facilitate the data analysis. Here our use case differs from normal Cassandra use cases where the queries are known in advance and we start from query and build our Cassandra data model. Since what we are building is a generic data analytic framework we don’t know what queries users want to perform on their stored data in Cassandra. But then we thought if we can build set of indexes up front according the queries to be done it would solve our dilemma. So we decided to give users the ability to define what indexes to be built using an XML configuration which is roughly as follows.

<index name="serverByTime">
   <part name="server"/>
   <part name="time"/>

Here we are building a composite index which can serve queries such as “Give me the events that came from server ‘esb12′ during the time period ’08:00’ to ’09:00′”. The indexing happens as described below.

Figure 2 : Indexing Model

1. Read an event from primary column family. (Basically a row from primary column family)

2. If event contain server and time keys get the corresponding values of these keys and concatenate them to create row key of index column family (In this case serverByTime).

3. Create index column family if not present. Add a new row having created row key if not already existing in the index column family and add column containing event id to it.

4. Add newly created row key as a column in the index row of index column family.

So as can be seen we are using a column family per index. Cassandra doesn’t sort rows on row keys if you are using RandomPartitioner which is the recommended partitioner for better performance. But columns within rows are always sorted using column key. We use this property to store index so that it can be queried using range queries. Each column family has a special row called “index row” which stores all the row keys of the column family.  When it is required to query the index, a range query on this row is done first to fetch the required row keys. Let’s say that we wanted to do a query to fetch “all the events from servers esb12 to esb15 during the time periods “08:00 to 09:00”. Let’s see how this is done using our indexing model.

1. Get the range values for required sub indexes. In this case “esb12” to “esb15” for server and “08:00” to “09:00” for time.

2. Create the range first and last values by concatenating values of sub indexes. The range first value would be “esb12—08:00” and the range last would be “esb15—09:00”. Here ‘—‘ is the constant delimiter we are using during the concatenation of row key parts.

3. Do the range query on index row and get the row keys in the range.

4. Fetch each row to using row key to get the result. Actually what we are getting here is set of event ids. To get actual event details another look up in primary column family using the event id is required.

Here using this model we are able to do range queries on multiple indexes using this method. One advantage of this over native secondary index facility present from version 0.8 onward is that the first index query isn’t restricted to an equality operation (e.g. server equals ‘esb12’ etc.). But the order of sub indexes making up of composite index (e.g: row key part order. In this case server—time) determines the types of queries which can be made. For instance we cannot query just using time. For that we would need to create another composite index starting with time as the first sub index.

We also had the requirement of fetching rows in batches from column families with each row being fetched exactly once to be processed. We achieved this using another special row having all the row insertion times to order rows according to the time of storage and then fetching rows in batches by doing range queries on this time stamp row with batch size limit.

All in all it’s rather involved scheme to get what we wanted out of Cassandra. Not a big selling point I would say. Also concatenating and creating row keys seemed little hackish to me. Also we had to come up with some tricks to make batching work with range queries. May be it’s not that strange Cassandra is also fondly known as “Hackendra” among some of us (has got kind of nice ring to it I feel :)) since sometimes it’s hard not to have that pesky feeling “Am I doing a hack here???” at the back of your mind when working with Cassandra. But by no means it means Cassandra is evil. We like Cassandra for its fast writes and scalable architecture. But in real life when it comes to relationships not every thing is perfect. But it doesn’t mean we don’t care. Our relationship with Cassandra is no different. :).

Read Full Post »

One of the past several days I decided to have a look how Hector, the Cassandra access library works under the hood. This was partially motivated by of the confusing the notions I had about some of API classes, specially Cluster. My initial impression was Cluster API class represents a Cassandra cluster but looking at some hector documentation I found out it has nothing to do with actual Cassandra cluster and it’s just a connection to Cassandra and the passed cluster name parameter in the API does not have anything to do with actual cluster name. The name is only for Hector to identify the connection.

I took few notes while reading through the Hector sources. So here goes..

  • Internally HConnectionManager is used to hold a connection pool in the form of a map.
    1. Connection Pool = Map<CassandraHost, ConcurrentHClientPool>)
    2. ConcurrentHClientPool contains Queue of HThriftClient which is a wrapper for Cassandra.Client which is the Cassandra Thrift client.
  • CassandraHost is created for each node in the cluster and host configuration is populated using passed CassandraHostConfigurator which contains comma separated list of hosts in the Cassandra cluster.  (CassandraHostConfigurator needs to be passed at Cluster creation time)
  • API operations are distributed among Cluster, Keyspace and set of query related classes (MultigetSliceQuery, RangeSliceQuery etc..) each of which use HConnectionManager for the actual execution of the operations.
  • All API operations are executed using HConnectionManager with suitable load balancing policy to which decides distribution of requests among the hosts.
  • Cluster contains operations related to creating updating keyspaces and column families.
  • Keyspace has operations related to adding and deleting data to column families. Mutator batches these requests and present them to keyspace as a batch for execution.
  • Query classes have operations related to fetching data from Cassandra keyspaces.

Read Full Post »