Feeds:
Posts
Comments

I have been reading on Join implementations available for Hadoop for past few days. In this post I recap some techniques I learnt during the process. The joins can be done at both Map side and Join side according to the nature of data sets of to be joined.

Reduce Side Join

Let’s take the following tables containing employee and department data.

Let’s see how join query below can be achieved using reduce side join.


SELECT Employees.Name, Employees.Age, Department.Name  FROM Employees INNER JOIN Department ON Employees.Dept_Id=Department.Dept_Id

Map side is responsible for emitting the join predicate values along with the corresponding record from each table so that records having same department id in both tables will end up at on same reducer which would then do the joining of records having same department id. However it is also required to tag the each record to indicate from which table the record originated so that joining happens between records of two tables. Following diagram illustrates the reduce side join process.

Here is the pseudo code for map function for this scenario.


map (K table, V rec) {

   dept_id = rec.Dept_Id

   tagged_rec.tag = table

   tagged_rec.rec = rec

   emit(dept_id, tagged_rec)

}

At reduce side join happens within records having different tags.


reduce (K dept_id, list<tagged_rec> tagged_recs)  {

   for (tagged_rec : tagged_recs) {

      for (tagged_rec1 : taagged_recs) {

          if (tagged_rec.tag != tagged_rec1.tag) {

              joined_rec = join(tagged_rec, tagged_rec1)

          }
       emit (tagged_rec.rec.Dept_Id, joined_rec)

    }

}

Map Side Join (Replicated Join)

Using Distributed Cache on Smaller Table

For this implementation to work one relation has to fit in to memory. The smaller table is replicated to each node and loaded to the memory. The join happens at map side without reducer involvement which significantly speeds up the process since this avoids shuffling all data across the network even-though most of the records not matching are later dropped. Smaller table can be populated to a hash-table so look-up by Dept_Id can be done. The pseudo code is outlined below.


map (K table, V rec) {

list recs = lookup(rec.Dept_Id) // Get smaller table records having this Dept_Id

for (small_table_rec : recs) {

joined_rec = join (small_table_rec, rec)

}

emit (rec.Dept_id, joined_rec)

}

Using Distributed Cache on Filtered Table

If the smaller table doesn’t fit the memory it may be possible to prune the contents of it if  filtering expression has been specified in the query. Consider following query.


SELECT Employees.Name, Employees.Age, Department.Name  FROM Employees INNER JOIN Department ON Employees.Dept_Id=Department.Dept_Id WHERE Department.Name="Eng"

Here a smaller data set can be derived from Department table by filtering out records having department names other than “Eng”. Now it may be possible to do replicated map side join with this smaller data set.

Replicated Semi-Join

Reduce Side Join with Map Side Filtering

Even of the filtered data of small table doesn’t fit in to the memory it may be possible to include just the Dept_Id s of filtered records in the replicated data set. Then at map side this cache can be used to filter out records which would be sent over to reduce side thus reducing the amount of data moved between the mappers and reducers.

The map side logic would look as follows.


map (K table, V rec) {

   // Check if this record needs to be sent to reducer
   boolean sendToReducer = check_cache(rec.Dept_Id)
   if (sendToReducer) {
      dept_id = rec.Dept_Id

      tagged_rec.tag = table

      tagged_rec.rec = rec

      emit(dept_id, tagged_rec)
   }
}

Reducer side logic would be same as the Reduce Side Join case.

Using a Bloom Filter

A bloom filter is a construct which can be used to test the containment of a given element in a set. A smaller representation of filtered Dept_ids can be derived if Dept_Id values can be augmented in to a bloom filter. Then this bloom filter can be replicated to each node. At the map side for each record fetched from the smaller table the bloom filter can be used to check whether the Dept_Id in the record is present in the bloom filter and only if so to emit that particular record to reduce side. Since a bloom filter is guaranteed not to provide false negatives the result would be accurate.

References

[1] Hadoop In Action

[2] Hadoop : The Definitive Guide

Advertisements

Been there. Done that. And suffered for that…

Programming is fun. But there are some other associated stuff we programmers blissfully skip or procrastinate because they are not so cool.

End result?…

Somebody is going to get hurt at the end of the day and that somebody may very well be a ourselves. So here are some stuff I have experienced and some of the stuff I my self have been guilty of doing and insights I gotten from them.

Good ol’ docs

It’s a well documented fact that documentation is.. hmm well.. let me think.. Good to have. Or is it important? Yep I know the feeling :). But it’s as things turn out, is some thing that needs to be done at the end of day. Who said we programmers do not have to toil for our food 🙂 right?. From a user’s perspective a feature without proper documentation is a near close to a feature which is not there at all. Say you developed a really neat feature and obviously you want people to try it out right? But what if they are not able to wrap their head around how to use it or they have to play a guessing game to get for trying to get it to work and in the process failing miserably? Now not only you have wasted their time but also have earned some bad karma. And yes, an intuitive user interface can go a long way to ease user’s pain but a good, to the point documentation sprinkled on top makes up a recipe that users can’t get enough of.

The Extra Mile

Say you developed this new cool feature. But in the hurry of pushing it off you cut some corners and left some manual step in the usage flow which better would have been done behind the curtains unbeknownst to the user. Now the user has to do this manual step every time he uses your feature which quickly becomes a pain specially if it turns out to be a heavily used feature. Optimize the UX. Cut unnecessary stuff from the user flow. Go that extra mile and users will thank you for that.

Mind your cycle

Go easy on your self. Make your development cycle quicker. Say you have some repetitive process to do in order to make the code you wrote to run in the test environment in order to check whether your feature/ fix is working correctly. Invest some time on automating this process, may be writing a handy script and it will help you to finish your work early and then go play :).

Let’s configure it

What if user want to fine tune the size of foo queue holding tasks for the bar thread pool of your program? Oh ok let’s make it configurable via UI then right? Or should we?? Too much configurability thrown at user’s face kills user experience. Do not force your users to fill in stuff which are better left with some sensible defaults every time they use your stuff. It may be that there is no need to configure every nook and corner of your program to make it work the way you want. Decide what should be in and what should be out. Better yet the middle ground to come would be to provide those configurations in an optional advanced configuration section with some sensible defaults which if user sees fit will go and change. And also remember to document them clearly as well so that user knows better when configuring those.

Nasty API docs

Wrong API docs are worse than having no API docs at all. It really happened to me once with a JMS API not working as published in its API docs. And I thought my thread programming was causing it. Took some considerable amount of hairs pulled to figure out the fault is with the API. Since my assumptions of the API derived from the docs were wrong, so was my program. Specially be mindful when you are changing an existing API implementation whether the assumptions and results returned in certain conditions specified in API docs still holds. If not change the docs accordingly.

Carpenters wanted..

Manage your broken windows. You may have to cut some corners and pull out some hacks due to time or release pressures. It’s OK as long as you know what your broken windows are and you intend to repair them the first chance you get. Leave some reminders and attend to them when you get the chance.

Love thy code.

Show that you care so that others will care. If you maintain your code in a good condition the other people taking over or contributing to your code will tend to care about maintaining it the same way. This is specially important in open source settings where you will not be the only one mending a piece of code written by you at the end of the day.

So there goes my list of tidbits on programming for the better. Pretty much regulation and common sense stuff which does not warrant a mention you might say. But as mentioned in the beginning I have been there. Done that. And have paid for that :).  And we keep doing that as well. So I hope this will post serve as a reminder for me at least, when I am on verge of doing some nasty thing next time around :). Anyway this is just my 2 cents. Please holler if you beg to differ.

Cassandra pagination has been the subject of several blogs elsewhere as well. In particular this excellent blog by Michael Kopp details how this can be generally handled using Cassandra API. We also had a our own use case where we needed paginated access. However for our use case scheme outlined by Micheal presents several shortcomings.

1. What if we want to fetch rows batch wise instead of columns?

2. If there are updates during the paged retrieval there is a chance that some items will be missed out. For example let’s say the last access is at column with column key with “Florence”. So the next retrieval would fetch a batch starting from “Florence” on wards. What if a column with key “Cologne” has been newly added? It would not get included in any of the future retrieval.

3. Also there may be a use case where it is required paginate the results obtained by filtering with a range query rather than fetching all the rows page wise in the column family. (Actually this was our use case)

 

So let’s have a look at how we took a stab at the beast, Cassandra pagination. Before that let me explain our use case fully so that it’s easier to grasp what we did and why we did it. Our use main case was to paginate the access to results returned from a range query which can cleanly expressed in SQL lingo as follows.

 


SELECT * FROM <column_family> WHERE <column_name_1> BETWEEN [from_1] AND [to_1] AND <column_name_2> BETWEEN [from_2] AND [to_2] .... AND <column_name_n> BETWEEN <from_n> AND <to_n>

 

Here each column_name is an index. (Actually a sub index of a composite index. For a description on our indexing scheme refer to my earlier blog Cassandra: Lessons Learnt) . So our use case is bit complicated in that it’s required to paginate the access of the result set obtained from a range query. Also our requirement was to fetch all the rows satisfying this criteria without missing any row provided that there would be new additions while we are retrieving rows in batches. In fact there may be a considerable time-lapse between two fetches since the retrieved data are processed using a scheduled task with configurable interval in our use case.  Additionally we had to leave the room for non batched access of the range query result as well. And of course we were not using the OrderedPartitioner. (Evils of OrderedPartitioner is well documented elsewhere. Sub optimal loadbalancing, creating hot spots etc.. ). Had we used OrderedPartitioner our life would have been bit easier since we would have been able to do a range query on the rows. But since we were using RandomPartitioner no ordering of rows using row keys can be assumed as well.

 

Ok that’s enough for the predicament that we were in couple of months back while faced with the task of ‘Cassandrafication’ our data layer. Hope you got the idea.. Now let’s see what we did to improve the situation.

 

First we had to deal with our inability to do range query on rows. Cassandra has this nice caveat, that columns of a particular row is always sorted using the column keys. So we utilized this nicety to impose an ordering on rows. We always maintain a meta row in which all the row keys are stored as columns. (Actually a row key is a column key and column value is empty). Let’s say this row is ‘RowIndex’. (See figure 1). Now when doing a query on column family we first query this row using a range query and get the rows matching the criteria and then do the real row fetching one by one using the row keys fetched. You might be wondering how the range query is constructed to match the where clauses in the given SQL above. In our scheme the row key is constituted from concatenating the value for each index. (Index is in fact a column in a particular row and we use the column value as the index value. This will become clearer by having a look at the first step of illustration given in figure 2). So this is the scheme we used for non batched retrieval of rows satisfying a particular query.

 

Figure 1 : Column family with meta row ‘RowIndex’

 

But for paginated use case this proved to be insufficient due to the second shortcoming outlined earlier. We realized that there needs to be an ordering from the timestamp to catch a newly added row even if its row key put it in a location in sorted order which is before the last accessed row key. So we introduced another meta row storing the timestamp of insertion of each row. Let’s say this row key of this meta row is ‘TimeStampIndex’. Each column of this row will hold the insertion timestamp as the column key and the corresponding row key of the row inserted at that particular timestamp as the column value. So now we need to do four things we add a row to the column family.

 

Figure 2 : Row insertion algorithm

1. Create the row key using the defined indexes. Here we use ‘server’ and ‘time’ as the indexes.

2. Insert row key in to the ‘RowIndex’ as a column.

3. Insert the row insertion timestamp along with row key as a column to the ‘TimeStampIndex’

4. Add the row itself to the column family.

 

‘RowIndex’ is to be used for non batched access of the range query result while ‘TimeStampIndex’ is to be used for batched access of the range query result.

 

Now when we want to fetch the rows in batches satisfying the range query criteria, first we get a batch size chunk of timestamps from ‘TimeStampIndex’. Then for each and every row associated with the timestamp we check whether if the row matches the filter criteria. This is a simple string comparison to check whether the row key falls between the range first and range last values.

 

Say for example the filter criteria for above illustration is following where clause.


WHERE 'server' BETWEEN 'esb' and 'esb' and 'hour' BETWEEN '08:00' and '09:00'

Now the range first value of the query would be ‘esb—08:00’ and the range last value would be ‘esb—09:00’. This will select events for server ‘esb’ during the hours from ’08:00′ to ’09:00′.  So if the row key is ‘esb—08:23’ it will get picked and if it is ‘esb—09:23’ it won’t.

 

So as can be seen for this scenario we didn’t use ‘RowIndex’ meta row. It’s for non batched use only. And in this way using ‘TimeStampIndex’ we can catch newly added rows without missing out on any row.

 

However it’s not without its own drawbacks.

1. The batch size is not consistent. Even though the batch size chunk is picked from the query some of these rows will be discarded since they do not match the filtering criteria. Solution would be to get multiple batches until the batch size number of rows fulfilling the filter criteria is found. But for now we are ok with inconsistent batch sizes.

2. What if an existing row is updated? It will get fetched a second time since the algorithm will not miss any newly added or updated row. This may or may not be desirable according to the use case. For us this is in fact the needed behavior since we need any new updates to an already fetched row. So we are ok with that too.

 

So that concludes our escapade with Cassandra pagination. The (love) story continues.. (Hope you saw the sarcasm sign unlike Sheldon.. :))

After working with Cassandra for couple of months I thought of jotting down my Cassandra experience including what we did, what I liked and not so liked. But I need to put a little disclaimer up front. We are currently using Cassandra 0.7  (“What !#$*?? Have you been living under a rock? Get your self a life and upgrade your darn installation” you would say. I know. I know.. :). We are working on it.). So some of the stuff mentioned here would not relate to later versions although the concepts we used are still mostly applicable. (I think kudos should go to Cassandra people for churning out several quick releases with some cool new features.)

That being said let me go through what we have been doing with Cassandra. Our use case is roughly as follows. We are using Cassandra as the persistence layer of our Business Activity Monitoring solution at WSO2 which is rewritten to scale better (About which I am hoping to blog some where in the line of “A scalable big data ready BAM solution – WSO2 BAM” some time soon.). The data are pumped from servers being monitored as events consisting of key value pairs and get written to Cassandra store after some minimal required processing at the receiver.

Figure 1 : Event format

These data are then indexed to facilitate the data analysis. Here our use case differs from normal Cassandra use cases where the queries are known in advance and we start from query and build our Cassandra data model. Since what we are building is a generic data analytic framework we don’t know what queries users want to perform on their stored data in Cassandra. But then we thought if we can build set of indexes up front according the queries to be done it would solve our dilemma. So we decided to give users the ability to define what indexes to be built using an XML configuration which is roughly as follows.

<index name="serverByTime">
   <part name="server"/>
   <part name="time"/>
</index>

Here we are building a composite index which can serve queries such as “Give me the events that came from server ‘esb12′ during the time period ’08:00’ to ’09:00′”. The indexing happens as described below.

Figure 2 : Indexing Model

1. Read an event from primary column family. (Basically a row from primary column family)

2. If event contain server and time keys get the corresponding values of these keys and concatenate them to create row key of index column family (In this case serverByTime).

3. Create index column family if not present. Add a new row having created row key if not already existing in the index column family and add column containing event id to it.

4. Add newly created row key as a column in the index row of index column family.

So as can be seen we are using a column family per index. Cassandra doesn’t sort rows on row keys if you are using RandomPartitioner which is the recommended partitioner for better performance. But columns within rows are always sorted using column key. We use this property to store index so that it can be queried using range queries. Each column family has a special row called “index row” which stores all the row keys of the column family.  When it is required to query the index, a range query on this row is done first to fetch the required row keys. Let’s say that we wanted to do a query to fetch “all the events from servers esb12 to esb15 during the time periods “08:00 to 09:00”. Let’s see how this is done using our indexing model.

1. Get the range values for required sub indexes. In this case “esb12” to “esb15” for server and “08:00” to “09:00” for time.

2. Create the range first and last values by concatenating values of sub indexes. The range first value would be “esb12—08:00” and the range last would be “esb15—09:00”. Here ‘—‘ is the constant delimiter we are using during the concatenation of row key parts.

3. Do the range query on index row and get the row keys in the range.

4. Fetch each row to using row key to get the result. Actually what we are getting here is set of event ids. To get actual event details another look up in primary column family using the event id is required.

Here using this model we are able to do range queries on multiple indexes using this method. One advantage of this over native secondary index facility present from version 0.8 onward is that the first index query isn’t restricted to an equality operation (e.g. server equals ‘esb12’ etc.). But the order of sub indexes making up of composite index (e.g: row key part order. In this case server—time) determines the types of queries which can be made. For instance we cannot query just using time. For that we would need to create another composite index starting with time as the first sub index.

We also had the requirement of fetching rows in batches from column families with each row being fetched exactly once to be processed. We achieved this using another special row having all the row insertion times to order rows according to the time of storage and then fetching rows in batches by doing range queries on this time stamp row with batch size limit.

All in all it’s rather involved scheme to get what we wanted out of Cassandra. Not a big selling point I would say. Also concatenating and creating row keys seemed little hackish to me. Also we had to come up with some tricks to make batching work with range queries. May be it’s not that strange Cassandra is also fondly known as “Hackendra” among some of us (has got kind of nice ring to it I feel :)) since sometimes it’s hard not to have that pesky feeling “Am I doing a hack here???” at the back of your mind when working with Cassandra. But by no means it means Cassandra is evil. We like Cassandra for its fast writes and scalable architecture. But in real life when it comes to relationships not every thing is perfect. But it doesn’t mean we don’t care. Our relationship with Cassandra is no different. :).

Let’s see how we can model some regularly used SQL queries using map reduce.

  • select … from … where …
Take following example
select f1, f2 from relation where f1 > 500

For this example let’s assume a suitable InputFormat (in case of Hadoop) does the reading from the database and emit key value pairs in the form (k, rec) where k is primary key and rec is the entire record. Pseudo code using map reduce is given below.

map (k, rec)  {
   if (rec.f1 > 500) {
      rec1 = <rec.f1, rec.f2>
      collect (k , rec1)
   }
}

As can be seen this is implemented using a map function. Output will be emitted only if predicate is satisfied.

  • select aggregate_func() from … where … groupby …
Let’s take the following example.
select f3, sum(f1), avg(f2) from relation where f1 > 500 groupby f3
The pseudo-code below describes how this is achieved using map reduce.
map (k, rec)  {
   if (rec.f1 > 500) {
      rec1 = <rec.f1, rec.f2, rec.f3>
      collect (rec.f3 , rec1)
   }
}
reduce(v, list<rec1>) {
   sum := 0
   avg := 0
   for each rec1 in list {
      sum += rec1.f1
      avg += rec1.f2
   }
   avg := avg / size(list)
   rec2 = <rec1.f3, sum, avg>
   store (v, rec2)
}

Here each v that reduce gets corresponds to a unique value in rec1.f3 field. Group by is implicitly done using the shuffling phase between map and reduce functions.

  • select aggregate_func() from … where … groupby … having …

Here additional having clause is used to filter out grouped results. Let’s take an extended version of earlier example.

select f3, sum(f1), avg(f2) from relation where f1 > 500 groupby f3 having avg(f2) > 50

No change is required in the map function. The modified reduce function is as below.

reduce(v, list<rec1>) {
   sum := 0
   avg := 0
   for each rec1 in list {
      sum += rec1.f1
      avg += rec1.f2
   }
   avg := avg / size(list)
   rec2 = <rec1.f3, sum, avg>
   if (avg > 50) {
      store (v, rec2)
   }
}

One of the past several days I decided to have a look how Hector, the Cassandra access library works under the hood. This was partially motivated by of the confusing the notions I had about some of API classes, specially Cluster. My initial impression was Cluster API class represents a Cassandra cluster but looking at some hector documentation I found out it has nothing to do with actual Cassandra cluster and it’s just a connection to Cassandra and the passed cluster name parameter in the API does not have anything to do with actual cluster name. The name is only for Hector to identify the connection.

I took few notes while reading through the Hector sources. So here goes..

  • Internally HConnectionManager is used to hold a connection pool in the form of a map.
    1. Connection Pool = Map<CassandraHost, ConcurrentHClientPool>)
    2. ConcurrentHClientPool contains Queue of HThriftClient which is a wrapper for Cassandra.Client which is the Cassandra Thrift client.
  • CassandraHost is created for each node in the cluster and host configuration is populated using passed CassandraHostConfigurator which contains comma separated list of hosts in the Cassandra cluster.  (CassandraHostConfigurator needs to be passed at Cluster creation time)
  • API operations are distributed among Cluster, Keyspace and set of query related classes (MultigetSliceQuery, RangeSliceQuery etc..) each of which use HConnectionManager for the actual execution of the operations.
  • All API operations are executed using HConnectionManager with suitable load balancing policy to which decides distribution of requests among the hosts.
  • Cluster contains operations related to creating updating keyspaces and column families.
  • Keyspace has operations related to adding and deleting data to column families. Mutator batches these requests and present them to keyspace as a batch for execution.
  • Query classes have operations related to fetching data from Cassandra keyspaces.

My previous couple of posts covered basic usage and security aspects of Apache Thrift. You can also use Thrift Servlet based transport to expose a Thrift service via a Servlet. This can be useful if you are in a OSGi environment since you can expose it to the outside world using OSGi HTTPService. But unfortunately it’s not possible to use this Servlet implementation directly in a web application(which should have been very useful) for reasons I will describe in the latter part of this post. For this implementation it is required to extend ‘TServlet‘ along side with your service implementation. I will be using the same Thrift service(arithmetic.thrift) and respective implementation and generated code from my earlier blog posts for this example as well.

Thrift Servlet

public class ArithmeticServiceServlet extends TServlet {

    public ArithmeticServiceServlet(TProcessor processor, TProtocolFactory inProtocolFactory,
                           TProtocolFactory outProtocolFactory) {
        super(processor, inProtocolFactory, outProtocolFactory);
    }

    public ArithmeticServiceServlet(TProcessor processor, TProtocolFactory protocolFactory) {
        super(processor, protocolFactory);
    }

}

No implementation of doGet or doPost is necessary by default since mapping of your service implementation class to respective doGet and doPost methods is done inside TServlet.

Registering the Servlet

This entails getting the OSGi HTTPService and registering the Servlet with it. This code snippet assumes you have already obtained a HTTPService reference using a preferred method (e.g: Using declarative service etc.).

    public void register() throws Exception{
        ArithmeticService.Processor processor = new ArithmeticService.Processor(
                new ArithmeticServiceImpl());
        TBinaryProtocol.Factory inProtFactory = new TBinaryProtocol.Factory(true, true);
        TBinaryProtocol.Factory outProtFactory = new TBinaryProtocol.Factory(true, true);

        httpServiceInstance.registerServlet("/arithmeticService", new ArithmeticServiceServlet(
                processor, inProtFactory, outProtFactory), new Hashtable()
                , httpServiceInstance.createDefaultHttpContext());

    }

Servlet is registered with “/arithmeticService” context.

Consuming the Service

Now let’s write the client to consume the service. Here THttpClient class from Thrift is used.

public class ServletClient {

    public void invoke() throws Exception {
        TTransport client = new THttpClient("http://localhost/arithmeticService");
        TProtocol protocol = new TBinaryProtocol(client);
        ArithmeticService.Client serviceClient = new ArithmeticService.Client(protocol);
        client.open();

        long addResult = serviceClient.add(100, 200);
        System.out.println("Add result: " + addResult);
        long multiplyResult = serviceClient.multiply(20, 40);
        System.out.println("Multiply result: " + multiplyResult);

        client.close();

    }

    public static void main(String[] args) throws Exception {
        ServletClient client = new ServletClient();
        client.invoke();
    }

}

Problem with Web Apps

Now it would have been great if we can use this Servlet in one of our web applications. But as you can see from our ‘ArithmeticServiceServlet’ implementation it hasn’t got the default no argument constructor which is a deal breaker for using this Servlet in a web application. The web container needs a no argument constructor in order to initialize the Servlet. So for now no for web apps. :(.