Feeds:
Posts
Comments

Archive for the ‘Projects’ Category

Cassandra pagination has been the subject of several blogs elsewhere as well. In particular this excellent blog by Michael Kopp details how this can be generally handled using Cassandra API. We also had a our own use case where we needed paginated access. However for our use case scheme outlined by Micheal presents several shortcomings.

1. What if we want to fetch rows batch wise instead of columns?

2. If there are updates during the paged retrieval there is a chance that some items will be missed out. For example let’s say the last access is at column with column key with “Florence”. So the next retrieval would fetch a batch starting from “Florence” on wards. What if a column with key “Cologne” has been newly added? It would not get included in any of the future retrieval.

3. Also there may be a use case where it is required paginate the results obtained by filtering with a range query rather than fetching all the rows page wise in the column family. (Actually this was our use case)

 

So let’s have a look at how we took a stab at the beast, Cassandra pagination. Before that let me explain our use case fully so that it’s easier to grasp what we did and why we did it. Our use main case was to paginate the access to results returned from a range query which can cleanly expressed in SQL lingo as follows.

 


SELECT * FROM <column_family> WHERE <column_name_1> BETWEEN [from_1] AND [to_1] AND <column_name_2> BETWEEN [from_2] AND [to_2] .... AND <column_name_n> BETWEEN <from_n> AND <to_n>

 

Here each column_name is an index. (Actually a sub index of a composite index. For a description on our indexing scheme refer to my earlier blog Cassandra: Lessons Learnt) . So our use case is bit complicated in that it’s required to paginate the access of the result set obtained from a range query. Also our requirement was to fetch all the rows satisfying this criteria without missing any row provided that there would be new additions while we are retrieving rows in batches. In fact there may be a considerable time-lapse between two fetches since the retrieved data are processed using a scheduled task with configurable interval in our use case.  Additionally we had to leave the room for non batched access of the range query result as well. And of course we were not using the OrderedPartitioner. (Evils of OrderedPartitioner is well documented elsewhere. Sub optimal loadbalancing, creating hot spots etc.. ). Had we used OrderedPartitioner our life would have been bit easier since we would have been able to do a range query on the rows. But since we were using RandomPartitioner no ordering of rows using row keys can be assumed as well.

 

Ok that’s enough for the predicament that we were in couple of months back while faced with the task of ‘Cassandrafication’ our data layer. Hope you got the idea.. Now let’s see what we did to improve the situation.

 

First we had to deal with our inability to do range query on rows. Cassandra has this nice caveat, that columns of a particular row is always sorted using the column keys. So we utilized this nicety to impose an ordering on rows. We always maintain a meta row in which all the row keys are stored as columns. (Actually a row key is a column key and column value is empty). Let’s say this row is ‘RowIndex’. (See figure 1). Now when doing a query on column family we first query this row using a range query and get the rows matching the criteria and then do the real row fetching one by one using the row keys fetched. You might be wondering how the range query is constructed to match the where clauses in the given SQL above. In our scheme the row key is constituted from concatenating the value for each index. (Index is in fact a column in a particular row and we use the column value as the index value. This will become clearer by having a look at the first step of illustration given in figure 2). So this is the scheme we used for non batched retrieval of rows satisfying a particular query.

 

Figure 1 : Column family with meta row ‘RowIndex’

 

But for paginated use case this proved to be insufficient due to the second shortcoming outlined earlier. We realized that there needs to be an ordering from the timestamp to catch a newly added row even if its row key put it in a location in sorted order which is before the last accessed row key. So we introduced another meta row storing the timestamp of insertion of each row. Let’s say this row key of this meta row is ‘TimeStampIndex’. Each column of this row will hold the insertion timestamp as the column key and the corresponding row key of the row inserted at that particular timestamp as the column value. So now we need to do four things we add a row to the column family.

 

Figure 2 : Row insertion algorithm

1. Create the row key using the defined indexes. Here we use ‘server’ and ‘time’ as the indexes.

2. Insert row key in to the ‘RowIndex’ as a column.

3. Insert the row insertion timestamp along with row key as a column to the ‘TimeStampIndex’

4. Add the row itself to the column family.

 

‘RowIndex’ is to be used for non batched access of the range query result while ‘TimeStampIndex’ is to be used for batched access of the range query result.

 

Now when we want to fetch the rows in batches satisfying the range query criteria, first we get a batch size chunk of timestamps from ‘TimeStampIndex’. Then for each and every row associated with the timestamp we check whether if the row matches the filter criteria. This is a simple string comparison to check whether the row key falls between the range first and range last values.

 

Say for example the filter criteria for above illustration is following where clause.


WHERE 'server' BETWEEN 'esb' and 'esb' and 'hour' BETWEEN '08:00' and '09:00'

Now the range first value of the query would be ‘esb—08:00’ and the range last value would be ‘esb—09:00’. This will select events for server ‘esb’ during the hours from ’08:00′ to ’09:00′.  So if the row key is ‘esb—08:23’ it will get picked and if it is ‘esb—09:23’ it won’t.

 

So as can be seen for this scenario we didn’t use ‘RowIndex’ meta row. It’s for non batched use only. And in this way using ‘TimeStampIndex’ we can catch newly added rows without missing out on any row.

 

However it’s not without its own drawbacks.

1. The batch size is not consistent. Even though the batch size chunk is picked from the query some of these rows will be discarded since they do not match the filtering criteria. Solution would be to get multiple batches until the batch size number of rows fulfilling the filter criteria is found. But for now we are ok with inconsistent batch sizes.

2. What if an existing row is updated? It will get fetched a second time since the algorithm will not miss any newly added or updated row. This may or may not be desirable according to the use case. For us this is in fact the needed behavior since we need any new updates to an already fetched row. So we are ok with that too.

 

So that concludes our escapade with Cassandra pagination. The (love) story continues.. (Hope you saw the sarcasm sign unlike Sheldon.. :))

Advertisements

Read Full Post »

After working with Cassandra for couple of months I thought of jotting down my Cassandra experience including what we did, what I liked and not so liked. But I need to put a little disclaimer up front. We are currently using Cassandra 0.7  (“What !#$*?? Have you been living under a rock? Get your self a life and upgrade your darn installation” you would say. I know. I know.. :). We are working on it.). So some of the stuff mentioned here would not relate to later versions although the concepts we used are still mostly applicable. (I think kudos should go to Cassandra people for churning out several quick releases with some cool new features.)

That being said let me go through what we have been doing with Cassandra. Our use case is roughly as follows. We are using Cassandra as the persistence layer of our Business Activity Monitoring solution at WSO2 which is rewritten to scale better (About which I am hoping to blog some where in the line of “A scalable big data ready BAM solution – WSO2 BAM” some time soon.). The data are pumped from servers being monitored as events consisting of key value pairs and get written to Cassandra store after some minimal required processing at the receiver.

Figure 1 : Event format

These data are then indexed to facilitate the data analysis. Here our use case differs from normal Cassandra use cases where the queries are known in advance and we start from query and build our Cassandra data model. Since what we are building is a generic data analytic framework we don’t know what queries users want to perform on their stored data in Cassandra. But then we thought if we can build set of indexes up front according the queries to be done it would solve our dilemma. So we decided to give users the ability to define what indexes to be built using an XML configuration which is roughly as follows.

<index name="serverByTime">
   <part name="server"/>
   <part name="time"/>
</index>

Here we are building a composite index which can serve queries such as “Give me the events that came from server ‘esb12′ during the time period ’08:00’ to ’09:00′”. The indexing happens as described below.

Figure 2 : Indexing Model

1. Read an event from primary column family. (Basically a row from primary column family)

2. If event contain server and time keys get the corresponding values of these keys and concatenate them to create row key of index column family (In this case serverByTime).

3. Create index column family if not present. Add a new row having created row key if not already existing in the index column family and add column containing event id to it.

4. Add newly created row key as a column in the index row of index column family.

So as can be seen we are using a column family per index. Cassandra doesn’t sort rows on row keys if you are using RandomPartitioner which is the recommended partitioner for better performance. But columns within rows are always sorted using column key. We use this property to store index so that it can be queried using range queries. Each column family has a special row called “index row” which stores all the row keys of the column family.  When it is required to query the index, a range query on this row is done first to fetch the required row keys. Let’s say that we wanted to do a query to fetch “all the events from servers esb12 to esb15 during the time periods “08:00 to 09:00”. Let’s see how this is done using our indexing model.

1. Get the range values for required sub indexes. In this case “esb12” to “esb15” for server and “08:00” to “09:00” for time.

2. Create the range first and last values by concatenating values of sub indexes. The range first value would be “esb12—08:00” and the range last would be “esb15—09:00”. Here ‘—‘ is the constant delimiter we are using during the concatenation of row key parts.

3. Do the range query on index row and get the row keys in the range.

4. Fetch each row to using row key to get the result. Actually what we are getting here is set of event ids. To get actual event details another look up in primary column family using the event id is required.

Here using this model we are able to do range queries on multiple indexes using this method. One advantage of this over native secondary index facility present from version 0.8 onward is that the first index query isn’t restricted to an equality operation (e.g. server equals ‘esb12’ etc.). But the order of sub indexes making up of composite index (e.g: row key part order. In this case server—time) determines the types of queries which can be made. For instance we cannot query just using time. For that we would need to create another composite index starting with time as the first sub index.

We also had the requirement of fetching rows in batches from column families with each row being fetched exactly once to be processed. We achieved this using another special row having all the row insertion times to order rows according to the time of storage and then fetching rows in batches by doing range queries on this time stamp row with batch size limit.

All in all it’s rather involved scheme to get what we wanted out of Cassandra. Not a big selling point I would say. Also concatenating and creating row keys seemed little hackish to me. Also we had to come up with some tricks to make batching work with range queries. May be it’s not that strange Cassandra is also fondly known as “Hackendra” among some of us (has got kind of nice ring to it I feel :)) since sometimes it’s hard not to have that pesky feeling “Am I doing a hack here???” at the back of your mind when working with Cassandra. But by no means it means Cassandra is evil. We like Cassandra for its fast writes and scalable architecture. But in real life when it comes to relationships not every thing is perfect. But it doesn’t mean we don’t care. Our relationship with Cassandra is no different. :).

Read Full Post »

For the past several weeks I have been writing a synchronization library in my free time. This can be used in a distributed manner, meaning several nodes in a cluster can use this library to synchronize between each other. This implementation is based on Apache Zookeeper. Though primarily Zookeeper based I have also left room for other implementations. All you have to do is to implement couple of interfaces. Let’s look at some features of the library followed by an example usage of the library.

  • The library API closely follows java.util.concurrent API so this would be a natural transition for developers familiar with concurrent package.
  • The synchronization granularity is thread level not node level. So this library can used for in VM synchronization between threads as well. But if you only require in VM synchronization ideally you are better off with the use of java.util.concurrent due to the performance factor. But you already knew that. :). But anyway this can be handy if you got several application threads contending for the same distributed and shared resource and you need to enforce mutual exclusion semantics per each user whether it be a thread or a node.
  • Re-entrancy is implemented at thread level in reentrant synchronization primitives. For example in ReentrantLock etc. Again this is in line with the semantics of java.util.concurrent.
  • There are also several places where semantics differ from java.util.concurrent package due to the distributed nature of the library. For example in CyclicBarrier the number of parties that should trigger barrier may be different of what you pass at the intilization of the CyclicBarrier instance. If the CyclicBarrier instance is pointing to a currently existing barrier Zookeeper node it will get the existing barrier’s number of parties required for triggering the barrier. So a getParties() call subsequent to the barrier initialization will reveal the true number of parties require to trigger the barrier. This is required since otherwise the barrier will not be in a consistent state if each joining party would specify different arguments for number of parties required to trigger the barrier.

Now let’s look at a sample usage of the library with the help of ReentrantLock.


// Zookeeper configuration to connect to the Zookeeper instance
ZKConfiguration config = new ZKConfiguration(&amp;quot;localhost:2181&amp;quot;, 1000000, null);

// Get Zookeeper specific Lock factory.
LockFactory fac = ZKFactory.getInstance(config);

// Get the reentrant lock from the factory on specified Zookeeper
// node. Others will also get the lock on this same node in order
// to synchronize with each other.
Lock lock = fac.getReentrantLock(&amp;quot;/test&amp;quot;);

// From here onwards it's pretty much java.util.Concurrent API.
try {
   lock.lock();
} catch (LockException e) {
   e.printStackTrace();
}

// Do mutual exclusive work. Write to db etc..

lock.unlock();

Usage of API can be found in sources of the unit tests for now until I get around to document them in the wiki. :). Currently following synchronization primitives are available.

  • ReentrantLock
  • ReadWriteLock
  • CyclicBarrier
  • DoubleBarrier

And I am hoping to add another couple of synchronization primitives as well. The project can be found at https://github.com/chamibuddhika/dsync. Any suggestions are welcome as always.

Read Full Post »

Introduction

Mooshabaya is an open source mashup authoring framework. It allows the mashups to be modeled as workflows and to be exported and deployed in mashup servers. This project will try to bridge the workflow domain and mashup domain using their commonalities. The project builds on the existing workflow modeling tool XBaya which currently supports exporting workflows as BPEL and Jython scripts.

Following features will be available in the initial release of Mooshabaya.

– Generating mashups from workflow models

– Service discovery from web registries

– Deployment of generated mashups to mashup servers

– Monitoring the mashup execution

Initially it will utilize WSO2 products for mashup deployment (WSO2 Mashup Server) and service discovery (WSO2 Governance Registry) requirements. In later releases it expected to extend its support for other products as well.

Mooshabaya Mashup Generator:Mooshabaya graphical workflow composer exports the composed workflows as mashups, by modifying XBaya Workflow Composer, which currently exports the composed workflows into BPEL and Jython.

Service Discovery: Mooshabaya basically will find service descriptions from WSO2 Governance Registry, which governs SOA deployment metadata. Mooshabaya will support different forms of user authentication with the registry as per the infrastructure level security requirements of the registry. For example it will support direct user authentication with the registry or brokered authentication via an Identity provider as required. In the latter case WSO2 Identity server will act as the authentication broker for authenticating the particular user via Mooshabaya.

Data aggregation:Mooshabaya will generate mashups which are does data aggregation from different input sources such as web feeds and data sources. Web feeds support would include RSS 1.0, RSS 2.0 and Atom. This would make mashups generated by Mooshabaya more versatile by enabling them to acquire and process data from different input sources at runtime.

Mashup Deployment:Mooshabaya will handle mashup deployment via a MTOM based service, which will upload the required mashup file, generated stubs and configuration files to a WSO2 mashup server where the mashup indented to be deployed.

Mashup Monitoring:Mooshabaya will enable the mashup developer to monitor the execution of the mashups after they have been deployed and run on the Mashup server. This will be carried out via an external or internal WS-Messenger instance to which Mooshabaya will subscribe prior to mashup monitoring session. These required configurations for subscribing to WS-Messenger instance should be provided by the user at the mashup modeling stages. Then the mashup generation components will use this information to integrate monitoring constructs in generated mashups. These constructs will cause events to be generated targeting the specified WS-Messenger instance at runtime.

Mashup Deployment:Mooshabaya will handle mashup deployment via a MTOM based service, which will upload the required mashup file, generated stubs and configuration files to a WSO2 mashup server where the mashup indented to be deployed.

Mashup Monitoring: Mooshabaya will enable the mashup developer to monitor the execution of the mashups after they have been deployed and run on the Mashup server. This will be carried out via an external or internal WS-Messenger instance to which Mooshabaya will subscribe prior to mashup monitoring session. These required configurations for subscribing to WS-Messenger instance should be provided by the user at the mashup modeling stages. Then the mashup generation components will use this information to integrate monitoring constructs in generated mashups. These constructs will cause events to be generated targeting the specified WS-Messenger instance at runtime.


Abstract System View

Abstract System View

High Level Design

High Level Design

This view of the system depicts the major components within Mooshabaya. Of these mashup generation component is vital. Mashup generation phase will consist of following key stages.

* Validation – validate the modeled workflow to check whether
* Core Mashup code generation – Implement the mashup with the service invocations based on the modeled workflow. This also integrates the other required stubs
and the configuration files to the mashup.
* Feed Integration – Inject the mashup code relevant to fetching feeds from different data sources such as web feeds at mashup runtime.
* Eventing Integration – Injecting eventing related mashup codes, which generate events during mashup execution.

Development

Currently this is being carried out as a Computer Science Engineering final year project. Project is currently in the design stage though some initial implementation work on mashup generation and service discovery has been done. Development works will be carried out using open source software development methodologies.

For more on information on the project visit the following.

Project Site: http://sourceforge.net/projects/mooshabaya/

Project Wiki:http://mooshabaya.wikidot.com/

Project SVN:https://mooshabaya.svn.sourceforge.net/svnroot/mooshabaya

Development

Currently this is being carried out as a BSc. Computer Science Engineering final year project. Project is currently in the design stage though some initial implementation work on mashup generation and service discovery has been done. Development works will be carried out using open source software development methodologies.

Read Full Post »