Feeds:
Posts
Comments

Archive for November, 2011

After working with Cassandra for couple of months I thought of jotting down my Cassandra experience including what we did, what I liked and not so liked. But I need to put a little disclaimer up front. We are currently using Cassandra 0.7  (“What !#$*?? Have you been living under a rock? Get your self a life and upgrade your darn installation” you would say. I know. I know.. :). We are working on it.). So some of the stuff mentioned here would not relate to later versions although the concepts we used are still mostly applicable. (I think kudos should go to Cassandra people for churning out several quick releases with some cool new features.)

That being said let me go through what we have been doing with Cassandra. Our use case is roughly as follows. We are using Cassandra as the persistence layer of our Business Activity Monitoring solution at WSO2 which is rewritten to scale better (About which I am hoping to blog some where in the line of “A scalable big data ready BAM solution – WSO2 BAM” some time soon.). The data are pumped from servers being monitored as events consisting of key value pairs and get written to Cassandra store after some minimal required processing at the receiver.

Figure 1 : Event format

These data are then indexed to facilitate the data analysis. Here our use case differs from normal Cassandra use cases where the queries are known in advance and we start from query and build our Cassandra data model. Since what we are building is a generic data analytic framework we don’t know what queries users want to perform on their stored data in Cassandra. But then we thought if we can build set of indexes up front according the queries to be done it would solve our dilemma. So we decided to give users the ability to define what indexes to be built using an XML configuration which is roughly as follows.

<index name="serverByTime">
   <part name="server"/>
   <part name="time"/>
</index>

Here we are building a composite index which can serve queries such as “Give me the events that came from server ‘esb12′ during the time period ’08:00’ to ’09:00′”. The indexing happens as described below.

Figure 2 : Indexing Model

1. Read an event from primary column family. (Basically a row from primary column family)

2. If event contain server and time keys get the corresponding values of these keys and concatenate them to create row key of index column family (In this case serverByTime).

3. Create index column family if not present. Add a new row having created row key if not already existing in the index column family and add column containing event id to it.

4. Add newly created row key as a column in the index row of index column family.

So as can be seen we are using a column family per index. Cassandra doesn’t sort rows on row keys if you are using RandomPartitioner which is the recommended partitioner for better performance. But columns within rows are always sorted using column key. We use this property to store index so that it can be queried using range queries. Each column family has a special row called “index row” which stores all the row keys of the column family.  When it is required to query the index, a range query on this row is done first to fetch the required row keys. Let’s say that we wanted to do a query to fetch “all the events from servers esb12 to esb15 during the time periods “08:00 to 09:00”. Let’s see how this is done using our indexing model.

1. Get the range values for required sub indexes. In this case “esb12” to “esb15” for server and “08:00” to “09:00” for time.

2. Create the range first and last values by concatenating values of sub indexes. The range first value would be “esb12—08:00” and the range last would be “esb15—09:00”. Here ‘—‘ is the constant delimiter we are using during the concatenation of row key parts.

3. Do the range query on index row and get the row keys in the range.

4. Fetch each row to using row key to get the result. Actually what we are getting here is set of event ids. To get actual event details another look up in primary column family using the event id is required.

Here using this model we are able to do range queries on multiple indexes using this method. One advantage of this over native secondary index facility present from version 0.8 onward is that the first index query isn’t restricted to an equality operation (e.g. server equals ‘esb12’ etc.). But the order of sub indexes making up of composite index (e.g: row key part order. In this case server—time) determines the types of queries which can be made. For instance we cannot query just using time. For that we would need to create another composite index starting with time as the first sub index.

We also had the requirement of fetching rows in batches from column families with each row being fetched exactly once to be processed. We achieved this using another special row having all the row insertion times to order rows according to the time of storage and then fetching rows in batches by doing range queries on this time stamp row with batch size limit.

All in all it’s rather involved scheme to get what we wanted out of Cassandra. Not a big selling point I would say. Also concatenating and creating row keys seemed little hackish to me. Also we had to come up with some tricks to make batching work with range queries. May be it’s not that strange Cassandra is also fondly known as “Hackendra” among some of us (has got kind of nice ring to it I feel :)) since sometimes it’s hard not to have that pesky feeling “Am I doing a hack here???” at the back of your mind when working with Cassandra. But by no means it means Cassandra is evil. We like Cassandra for its fast writes and scalable architecture. But in real life when it comes to relationships not every thing is perfect. But it doesn’t mean we don’t care. Our relationship with Cassandra is no different. :).

Advertisements

Read Full Post »

Let’s see how we can model some regularly used SQL queries using map reduce.

  • select … from … where …
Take following example
select f1, f2 from relation where f1 > 500

For this example let’s assume a suitable InputFormat (in case of Hadoop) does the reading from the database and emit key value pairs in the form (k, rec) where k is primary key and rec is the entire record. Pseudo code using map reduce is given below.

map (k, rec)  {
   if (rec.f1 > 500) {
      rec1 = <rec.f1, rec.f2>
      collect (k , rec1)
   }
}

As can be seen this is implemented using a map function. Output will be emitted only if predicate is satisfied.

  • select aggregate_func() from … where … groupby …
Let’s take the following example.
select f3, sum(f1), avg(f2) from relation where f1 > 500 groupby f3
The pseudo-code below describes how this is achieved using map reduce.
map (k, rec)  {
   if (rec.f1 > 500) {
      rec1 = <rec.f1, rec.f2, rec.f3>
      collect (rec.f3 , rec1)
   }
}
reduce(v, list<rec1>) {
   sum := 0
   avg := 0
   for each rec1 in list {
      sum += rec1.f1
      avg += rec1.f2
   }
   avg := avg / size(list)
   rec2 = <rec1.f3, sum, avg>
   store (v, rec2)
}

Here each v that reduce gets corresponds to a unique value in rec1.f3 field. Group by is implicitly done using the shuffling phase between map and reduce functions.

  • select aggregate_func() from … where … groupby … having …

Here additional having clause is used to filter out grouped results. Let’s take an extended version of earlier example.

select f3, sum(f1), avg(f2) from relation where f1 > 500 groupby f3 having avg(f2) > 50

No change is required in the map function. The modified reduce function is as below.

reduce(v, list<rec1>) {
   sum := 0
   avg := 0
   for each rec1 in list {
      sum += rec1.f1
      avg += rec1.f2
   }
   avg := avg / size(list)
   rec2 = <rec1.f3, sum, avg>
   if (avg > 50) {
      store (v, rec2)
   }
}

Read Full Post »

One of the past several days I decided to have a look how Hector, the Cassandra access library works under the hood. This was partially motivated by of the confusing the notions I had about some of API classes, specially Cluster. My initial impression was Cluster API class represents a Cassandra cluster but looking at some hector documentation I found out it has nothing to do with actual Cassandra cluster and it’s just a connection to Cassandra and the passed cluster name parameter in the API does not have anything to do with actual cluster name. The name is only for Hector to identify the connection.

I took few notes while reading through the Hector sources. So here goes..

  • Internally HConnectionManager is used to hold a connection pool in the form of a map.
    1. Connection Pool = Map<CassandraHost, ConcurrentHClientPool>)
    2. ConcurrentHClientPool contains Queue of HThriftClient which is a wrapper for Cassandra.Client which is the Cassandra Thrift client.
  • CassandraHost is created for each node in the cluster and host configuration is populated using passed CassandraHostConfigurator which contains comma separated list of hosts in the Cassandra cluster.  (CassandraHostConfigurator needs to be passed at Cluster creation time)
  • API operations are distributed among Cluster, Keyspace and set of query related classes (MultigetSliceQuery, RangeSliceQuery etc..) each of which use HConnectionManager for the actual execution of the operations.
  • All API operations are executed using HConnectionManager with suitable load balancing policy to which decides distribution of requests among the hosts.
  • Cluster contains operations related to creating updating keyspaces and column families.
  • Keyspace has operations related to adding and deleting data to column families. Mutator batches these requests and present them to keyspace as a batch for execution.
  • Query classes have operations related to fetching data from Cassandra keyspaces.

Read Full Post »