Archive for the ‘Concepts’ Category

I/O Demystified

With all the hype on highly scalable server design and the rage behind nodejs I
have been meaning to do some focused reading on IO design patterns to which
until now couldn’t find enough time to invest. Now having done some research I
thought it’s best to jot down stuff I came across as a future reference for me
and any one who may come across this post. OK then.. Let’s hop on the I/O bus
and go for a ride.

Types of I/O

There are four different ways IO can be done according to the blocking or non
blocking nature of the operations and the synchronous or asynchronous nature of
IO readiness/completion event notifications.

Synchronous Blocking I/O

This is where the IO operation blocks the application until its completion
which forms the basis of typical thread per connection model found in most web servers.

When the blocking read() or write() is called there will be a context switch to
the kernel where the IO operation would happen and data would be copied to a
kernel buffer. Afterwards, the kernel buffer will be transfered to user space
application level buffer and the application thread will be marked as runnable
upon which the application would unblock and read the data in user space buffer

Thread per connection model tries to limit the effect of this blocking by confining a
connection to a thread so that handling of other concurrent connections will not
be blocked by an I/O operation on one connection. This is fine as long as the
connections are short lived and data link latencies are not that bad. However in
the case of long lived or high latency connections the chances are that threads
will be held up by these connections for a long time causing starvation for new
connections if a fixed size thread pool is used since blocked threads cannot be
reused to service new connections while in the state of being blocked or else it
will cause a large number of threads to be spawned within the system if each
connection is serviced using a new thread, which can become pretty resource
intensive with high context switching costs for a highly concurrent load.

  ServerSocket server = new ServerSocket(port);
  while(true) {
    Socket connection = server.accept();

 Simple thread per connection server

Synchronous Non Blocking I/O

In this mode the device or the connection is configured as non blocking so that
read() and write() operations will not be blocked. This usually means if the
operation cannot be immediately satisfied it would return with an error code
indicating that the operation would block (EWOULDBLOCK in POSIX) or the device
is temporarily unavailable (EAGAIN in POSIX). It is up to the application to
poll until the device is ready and all the data are read. However this is not
very efficient since each of these calls would cause a context switch to kernel
and back irrespective of whether some data was read or not.

Asynchronous Non Blocking I/O with Readiness Events

The problem with the earlier mode was that the application had to
poll and busy wait to get the job done. Wouldn’t it be better that some how the
application was notified when the device is ready to be read/ written? That is what
exactly this mode provides you with. Using a special system call (varies
according to the platform – select()/poll()/epoll() for Linux,
kqueue() for BSD, /dev/poll for Solaris) the application registers the interest
of getting I/O readiness information for a certain I/O operation (read or write)
from a certain device (a file descriptor in Linux parlance since all sockets are
abstracted using file descriptors). Afterwards this system call is invoked, which
would block until at least on of the registered file descriptors become ready.
Once this is true the file descriptors ready for doing I/O will be fetched as the
return of the system call and can be serviced sequentially in a loop in the
application thread.

The ready connection processing logic is usually contained
within a user provided event handler which would still have to issue non blocking
read()/write() calls to fetch data from device to kernel and ultimately to the
user space buffer incurring a context switch to the kernel. More ever there is
usually no absolute guarantee that it will be possible to do the intended I/O
with the device since what operating system provides is only an indication that
the device might be ready to do the I/O operation of interest but the non blocking
read() or write() can bail you out in such situations. However this should be
the exception than the norm.

So the overall idea is to get readiness events in an asynchronous fashion and
register some event handlers to handle once such event notifications are
triggered. So as you can see all of these can be done in a single thread while
multiplexing among different connections primarily due to the nature of the
select() (here I choose a representative system call) which can return readiness
of multiple sockets at a time. This is part of the appeal of this mode of
operation where one thread can serve large number of connections at a time. This
mode is what usually known as the “Non Blocking I/O” model.

Java has abstracted out the differences between platform specific system call
implementations with its NIO API. The socket/file descriptors are abstracted
using Channels and Selector encapsulates the selection system call. The
applications interested in getting readiness events registers a Channel (usually
a SocketChannel obtained by an accept() on a ServerSocketChannel) with the
Selector and get a SelectionKey which acts as a handle for holding the Channel
and registration information. Then the blocking select() call is made on
Selector which would return a set of SelectionKeys which then can be processed
one by one using the application specified event handlers.

Selector selector = Selector.open();


SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

while(true) {

  int readyChannels = selector.select();

  if(readyChannels == 0) continue;

  Set<SelectionKey> selectedKeys = selector.selectedKeys();

  Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

  while(keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing


Simple non blocking server

Asynchronous and Non Blocking I/O with Completion Events

Readiness events only go so far to notify you that the device/ socket is ready do
something. The application still has to do the dirty work of reading the data
from the device/ socket (more accurately directing the operating system to do so
via a system call) to the user space buffer all the way from device. Wouldn’t it
be nice to delegate this job to the operating system to run in the background
and let it inform you once it’s completed the job by transferring all the data
from device to kernel buffer and finally to the application level buffer?
That is the basic idea behind this mode usually known as the “Asynchronous I/O”
mode. For this it is required the operating system support AIO operations.
In Linux this support is present in aio POSIX API from 2.6 and for Windows this
is present in the form of “I/O Completion Ports”.

With NIO2 Java has stepped up its support for this mode with its
AsynchronousChannel API.

Operating System Support

In order to support readiness and completion event notifications different
operating systems provide varying system calls. For readiness events select()
and poll() can be used in Linux based systems. However the newer epoll() variant
is preferred due to its efficiency over select() or poll(). select() suffer from
the fact that the selection time increases linearly with the number of
descriptors monitored. It is appearently notorious for overwriting the file
descriptor array references. So each time it is called the descriptor array is
required to be repopulated from a separate copy. Not an elegant solution at any rate.

The epoll() variant can be configured in two ways. Namely edge-triggered and
level-triggered. In edge-triggered case it will emit a notification only when
an event is detected on the associated descriptor. Say during an
event-triggered notification your application handler only read half of the
kernel input buffer. Now it won’t get a notification on this descriptor next
time around even when there are some data to be read unless the device is ready
to send more data causing a file descriptor event. Level-triggered
configuration on the other hand will trigger a notification each time when there
is data to be read.

The comparable system calls are present in the form of kqueue in BSD flavours
and /dev/poll or “Event Completion” in Solaris depending on the version. The
Windows equivalent is “I/O Completion Ports”.

The situation for the AIO mode however is bit different at least in the Linux
case. The aio support for sockets in Linux seems to be shady at best with some
suggesting it is actually using readiness events at kernel level while providing
an asynchronous abstraction on completion events at application level. However
Windows seems to support this first class again via “I/O Completion Ports”.

Design I/O Patterns 101

There are patterns every where when it comes to software development. I/O is no
different. There are couple I/O patterns associated with NIO and AIO models
which are described below.

Reactor Pattern

There are several components participating in this pattern. I will go through
them first so it would be easy to understand the diagram.

Reactor Initiator: This is the component which would initiate the non blocking
server by configuring and initiating the dispatcher. First it would bind the
server socket and register it with the demultiplexer for client connection accept
readiness events. Then the event handler implementations for each type of
readiness events (read/ write/ accept etc..) will be registered with the
dispatcher. Next the dispatcher event loop will be invoked to handle event

Dispatcher : Defines an interface for registering, removing, and dispatching
Event Handlers responsible for reacting on connection events which include
connection acceptance, data input/output and timeout events on a set of
connections. For servicing a client connection the related event handler (e.g:
accept event handler) would register the accepted client channel (wrapper for
underlying client socket) with the demultiplexer along side with the type of
readiness events to listen for that particular channel. Afterwards the
dispatcher thread will invoke the blocking readiness selection operation on
demultiplexer for the set of registered channels. Once one or more registered
channels are ready for I/O the dispatcher would service each returned “Handle”
associated with the each ready channel one by one using registered event
handlers. It is important that these event handlers don’t hold up dispatcher
thread since it will delay dispatcher servicing other ready connections. Since
the usual logic within an event handler includes transferring data to/from the
ready connection which would block until all the data are transferred between
user space and kernel space data buffers normally it is the case that these
handlers are run in different threads from a thread pool.

Handle : A handle is returned once a channel is registered with the
demultiplexer which encapsulates connection channel and readiness information.
A set of ready Handles would be returned by demultiplexer readiness selection
operation. Java NIO equivalent is SelectionKey.

Demultiplexer : Waits for readiness events of in one or more registered
connection channels. Java NIO equivalent is Selector.

Event Handler : Specifies the interface having hook methods for dispatching
connection events. These methods need to be implemented by application specific
event handler implementations.

Concrete Event Handler : Contains the logic to read/write data from underlying
connection and to do the required processing or initiate client connection
acceptance protocol from the passed Handle.

Event handlers are typically run in separate threads from a thread pool as shown in below diagram.

A simple echo server implementation for this pattern is as follows (without event handler thread pool).

public class ReactorInitiator {

private static final int NIO_SERVER_PORT = 9993;

  public void initiateReactiveServer(int port) throws Exception {

    ServerSocketChannel server = ServerSocketChannel.open();
    server.socket().bind(new InetSocketAddress(port));

    Dispatcher dispatcher = new Dispatcher();
    dispatcher.registerChannel(SelectionKey.OP_ACCEPT, server);

      SelectionKey.OP_ACCEPT, new AcceptEventHandler(

      SelectionKey.OP_READ, new ReadEventHandler(

    SelectionKey.OP_WRITE, new WriteEventHandler());

    dispatcher.run(); // Run the dispatcher loop


  public static void main(String[] args) throws Exception {
    System.out.println("Starting NIO server at port : " +
    new ReactorInitiator().


public class Dispatcher {

  private Map<Integer, EventHandler> registeredHandlers =
    new ConcurrentHashMap<Integer, EventHandler>();
  private Selector demultiplexer;

  public Dispatcher() throws Exception {
    demultiplexer = Selector.open();

  public Selector getDemultiplexer() {
    return demultiplexer;

  public void registerEventHandler(
    int eventType, EventHandler eventHandler) {
    registeredHandlers.put(eventType, eventHandler);

  // Used to register ServerSocketChannel with the
  // selector to accept incoming client connections
  public void registerChannel(
    int eventType, SelectableChannel channel) throws Exception {
    channel.register(demultiplexer, eventType);

  public void run() {
    try {
      while (true) { // Loop indefinitely

        Set<SelectionKey> readyHandles =
        Iterator<SelectionKey> handleIterator =

        while (handleIterator.hasNext()) {
          SelectionKey handle = handleIterator.next();

          if (handle.isAcceptable()) {
            EventHandler handler =
           // Note : Here we don't remove this handle from
           // selector since we want to keep listening to
           // new client connections

          if (handle.isReadable()) {
            EventHandler handler =

          if (handle.isWritable()) {
            EventHandler handler =
    } catch (Exception e) {


public interface EventHandler {

   public void handleEvent(SelectionKey handle) throws Exception;


public class AcceptEventHandler implements EventHandler {
  private Selector demultiplexer;
  public AcceptEventHandler(Selector demultiplexer) {
    this.demultiplexer = demultiplexer;

  public void handleEvent(SelectionKey handle) throws Exception {
    ServerSocketChannel serverSocketChannel =
     (ServerSocketChannel) handle.channel();
    SocketChannel socketChannel = serverSocketChannel.accept();
    if (socketChannel != null) {
        demultiplexer, SelectionKey.OP_READ);


public class ReadEventHandler implements EventHandler {

  private Selector demultiplexer;
  private ByteBuffer inputBuffer = ByteBuffer.allocate(2048);

  public ReadEventHandler(Selector demultiplexer) {
    this.demultiplexer = demultiplexer;

  public void handleEvent(SelectionKey handle) throws Exception {
    SocketChannel socketChannel =
     (SocketChannel) handle.channel();

    socketChannel.read(inputBuffer); // Read data from client

    // Rewind the buffer to start reading from the beginning

    byte[] buffer = new byte[inputBuffer.limit()];

    System.out.println("Received message from client : " +
      new String(buffer));
    // Rewind the buffer to start reading from the beginning
    // Register the interest for writable readiness event for
    // this channel in order to echo back the message

      demultiplexer, SelectionKey.OP_WRITE, inputBuffer);


public class WriteEventHandler implements EventHandler {

  public void handleEvent(SelectionKey handle) throws Exception {
    SocketChannel socketChannel =
      (SocketChannel) handle.channel();
    ByteBuffer inputBuffer = (ByteBuffer) handle.attachment();
    socketChannel.close(); // Close connection


Proactor Pattern

This pattern is based on asynchronous I/O model. Main components are as follows.

Proactive Initiator : This is the entity which initiates Asynchronous Operation
accepting client connections. This is usually the server application’s main
thread. Registers a Completion Handler along with a Completion Dispatcher to
handle connection acceptance asynchronous event notification.

Asynchronous Operation Processor : This is responsible for carrying out I/O
operations asynchronously and providing completion event notifications to
application level Completion Handler. This is usually the asynchronous I/O
interface exposed by Operating System.

Asynchronous Operation : Asynchronous Operations are run to completion
by the Asynchronous Operation Processor in separate kernel threads.

Completion Dispatcher : This is responsible for calling back to the application
Completion Handlers when Asynchronous Operations complete. When the Asynchronous
Operation Processor completes an asynchronously initiated operation, the
Completion Dispatcher performs an application callback on its behalf. Usually
delegates the event notification handling to the suitable Completion Handler
according to the type of the event.

Completion Handler : This is the interface implemented by application to process
asynchronous event completion events.

Let’s look at how this pattern can be implemented (as a simple echo server) using new Java NIO.2 API added in Java 7.

public class ProactorInitiator {
  static int ASYNC_SERVER_PORT = 4333;

  public void initiateProactiveServer(int port)
    throws IOException {

    final AsynchronousServerSocketChannel listener =
        new InetSocketAddress(port));
     AcceptCompletionHandler acceptCompletionHandler =
       new AcceptCompletionHandler(listener);

     SessionState state = new SessionState();
     listener.accept(state, acceptCompletionHandler);

  public static void main(String[] args) {
    try {
       System.out.println("Async server listening on port : " +
       new ProactorInitiator().initiateProactiveServer(
    } catch (IOException e) {

    // Sleep indefinitely since otherwise the JVM would terminate
    while (true) {
      try {
      } catch (InterruptedException e) {

public class AcceptCompletionHandler
    CompletionHandler<AsynchronousSocketChannel, SessionState> {

  private AsynchronousServerSocketChannel listener;

  public AcceptCompletionHandler(
    AsynchronousServerSocketChannel listener) {
    this.listener = listener;

  public void completed(AsynchronousSocketChannel socketChannel,
    SessionState sessionState) {
   // accept the next connection
   SessionState newSessionState = new SessionState();
   listener.accept(newSessionState, this);

   // handle this connection
   ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
   ReadCompletionHandler readCompletionHandler =
     new ReadCompletionHandler(socketChannel, inputBuffer);
     inputBuffer, sessionState, readCompletionHandler);

  public void failed(Throwable exc, SessionState sessionState) {
   // Handle connection failure...


public class ReadCompletionHandler implements
  CompletionHandler<Integer, SessionState> {

   private AsynchronousSocketChannel socketChannel;
   private ByteBuffer inputBuffer;

   public ReadCompletionHandler(
     AsynchronousSocketChannel socketChannel,
     ByteBuffer inputBuffer) {
     this.socketChannel = socketChannel;
     this.inputBuffer = inputBuffer;

   public void completed(
     Integer bytesRead, SessionState sessionState) {

     byte[] buffer = new byte[bytesRead];
     // Rewind the input buffer to read from the beginning

     String message = new String(buffer);

     System.out.println("Received message from client : " +

     // Echo the message back to client
     WriteCompletionHandler writeCompletionHandler =
       new WriteCompletionHandler(socketChannel);

     ByteBuffer outputBuffer = ByteBuffer.wrap(buffer);

       outputBuffer, sessionState, writeCompletionHandler);

  public void failed(Throwable exc, SessionState attachment) {
    //Handle read failure.....


public class WriteCompletionHandler implements
  CompletionHandler<Integer, SessionState> {

  private AsynchronousSocketChannel socketChannel;

  public WriteCompletionHandler(
    AsynchronousSocketChannel socketChannel) {
    this.socketChannel = socketChannel;

  public void completed(
    Integer bytesWritten, SessionState attachment) {
    try {
    } catch (IOException e) {

  public void failed(Throwable exc, SessionState attachment) {
   // Handle write failure.....


public class SessionState {

  private Map<String, String> sessionProps =
    new ConcurrentHashMap<String, String>();

   public String getProperty(String key) {
     return sessionProps.get(key);

   public void setProperty(String key, String value) {
     sessionProps.put(key, value);


Each type of event completion (accept/ read/ write) is handled by a separate
completion handler implementing CompletionHandler interface (Accept/ Read/
WriteCompletionHandler etc.). The state transitions are managed inside these
connection handlers. Additional SessionState argument can be used to
hold client session specific state across a series of completion events.

NIO Frameworks (HTTPCore)

If you are thinking of implementing a NIO based HTTP server you are in luck.
Apache HTTPCore library provides excellent support for handling HTTP traffic
with NIO. API provides higher level abstractions on top of NIO layer with HTTP
requests handling built in. A minimal non blocking HTTP server implementation
which returns a dummy output for any GET request is given below.

public class NHttpServer {

  public void start() throws IOReactorException {
    HttpParams params = new BasicHttpParams();
    // Connection parameters
        HttpConnectionParams.SO_TIMEOUT, 60000)
       HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
       HttpConnectionParams.STALE_CONNECTION_CHECK, true)
       HttpConnectionParams.TCP_NODELAY, true);

    final DefaultListeningIOReactor ioReactor =
      new DefaultListeningIOReactor(2, params);
    // Spawns an IOReactor having two reactor threads
    // running selectors. Number of threads here is
    // usually matched to the number of processor cores
    // in the system

    // Application specific readiness event handler
    ServerHandler handler = new ServerHandler();

    final IOEventDispatch ioEventDispatch =
      new DefaultServerIOEventDispatch(handler, params);
    // Default IO event dispatcher encapsulating the
    // event handler

    ListenerEndpoint endpoint = ioReactor.listen(
      new InetSocketAddress(4444));

    // start the IO reactor in a new separate thread
    Thread t = new Thread(new Runnable() {
      public void run() {
        try {
          System.out.println("Listening in port 4444");
        } catch (InterruptedIOException ex) {
        } catch (IOException e) {
        } catch (Exception e) {

    // Wait for the endpoint to become ready,
    // i.e. for the listener to start accepting requests.
    try {
    } catch (InterruptedException e) {

  public static void main(String[] args)
    throws IOReactorException {
    new NHttpServer().start();


public class ServerHandler implements NHttpServiceHandler {

 private static final int BUFFER_SIZE = 2048;

 private static final String RESPONSE_SOURCE_BUFFER =

 // the factory to create HTTP responses
 private final HttpResponseFactory responseFactory;

 // the HTTP response processor
 private final HttpProcessor httpProcessor;

 // the strategy to re-use connections
 private final ConnectionReuseStrategy connStrategy;

 // the buffer allocator
 private final ByteBufferAllocator allocator;

 public ServerHandler() {
   this.responseFactory = new DefaultHttpResponseFactory();
   this.httpProcessor = new BasicHttpProcessor();
   this.connStrategy = new DefaultConnectionReuseStrategy();
   this.allocator = new HeapByteBufferAllocator();

 public void connected(
   NHttpServerConnection nHttpServerConnection) {
   System.out.println("New incoming connection");

 public void requestReceived(
   NHttpServerConnection nHttpServerConnection) {

   HttpRequest request =
   if (request instanceof HttpEntityEnclosingRequest) {
     // Handle POST and PUT requests
   } else {

     ContentOutputBuffer outputBuffer =
       new SharedOutputBuffer(
         BUFFER_SIZE, nHttpServerConnection, allocator);

     HttpContext context =
       RESPONSE_SOURCE_BUFFER, outputBuffer);
     OutputStream os =
       new ContentOutputStream(outputBuffer);

     // create the default response to this request
     ProtocolVersion httpVersion =
     HttpResponse response =
         httpVersion, HttpStatus.SC_OK,

     // create a basic HttpEntity using the source
     // channel of the response pipe
     BasicHttpEntity entity = new BasicHttpEntity();
     if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) {

     String method = request.getRequestLine().

     if (method.equals("GET")) {
       try {
         os.write(new String("Hello client..").

     } catch (Exception e) {
    } // Handle other http methods

 public void inputReady(
    NHttpServerConnection nHttpServerConnection,
    ContentDecoder contentDecoder) {
    // Handle request enclosed entities here by reading
    // them from the channel

 public void responseReady(
    NHttpServerConnection nHttpServerConnection) {

   try {
   } catch (IOException e) {

 public void outputReady(
   NHttpServerConnection nHttpServerConnection,
   ContentEncoder encoder) {
   HttpContext context = nHttpServerConnection.getContext();
   ContentOutputBuffer outBuf =
    (ContentOutputBuffer) context.getAttribute(

   try {
   } catch (IOException e) {

 public void exception(
   NHttpServerConnection nHttpServerConnection,
   IOException e) {

 public void exception(
   NHttpServerConnection nHttpServerConnection,
   HttpException e) {

 public void timeout(
   NHttpServerConnection nHttpServerConnection) {
   try {
   } catch (IOException e) {

 public void closed(
   NHttpServerConnection nHttpServerConnection) {
   try {
   } catch (IOException e) {


IOReactor class will basically wrap the demultiplexer functionality with
ServerHandler implementation handling readiness events.

Apache Synapse (an open source ESB) contains a good implementation of a
NIO based HTTP server in which NIO is used to scaling for a large number
of clients per instance with rather constant memory usage over time.
The implementation also contains good debugging and server statistics
collection mechanisms built in along with Axis2 transport framework integration.
It can be found at [1].


There are several options when it comes to doing I/O which can affect the
scalability and performance of servers. Each of above I/O mechanisms have pros and
cons so that the decisions should be made on expected scalability and performance
characteristics and ease of maintainence of these approaches. This concludes my
somewhat long winded article on I/O. Feel free to provide suggestions,
corrections or comments that you may have. Complete source codes for servers
outlined in the post along with clients can be downloaded from here.



There were many references I went through in the process. Below are some of the interesting ones.

[1] http://www.ibm.com/developerworks/java/library/j-nio2-1/index.html

[2] http://www.ibm.com/developerworks/linux/library/l-async/

[3] http://lse.sourceforge.net/io/aionotes.txt

[4] http://wknight8111.blogspot.com/search/label/AIO

[5] http://nick-black.com/dankwiki/index.php/Fast_UNIX_Servers

[6] http://today.java.net/pub/a/today/2007/02/13/architecture-of-highly-scalable-nio-server.html

[7] Java NIO by Ron Hitchens

[8] http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

[9] http://www.cs.wustl.edu/~schmidt/PDF/proactor.pdf

[10] http://www.kegel.com/c10k.html

Read Full Post »

JMX : Some Introductory Notes


JMX (Java Management Extensions) is a J2SE technology which enables management
and monitoring of Java applications. The basic idea is to implement a set of
management objects and register the implementations to a platform server
from where these implementations can be invoked either locally or remotely to
the JVM using a set of connectors or adapters.

A management/instrumentation object is called an MBean (stands for Managed Bean). Once
instantiated a MBean will be registered with a unique ObjectName with the
platform MBeanServer. MBeanServer acts as a repository of MBeans enabling the
creation, registering, accessing and removing of the MBeans. However MBeanServer
does not persist the MBean information. So with a restart of the JVM you would
loose all the MBeans in it. The MBeanServer is normally accessed through its
MBeanServerConnection API which works both locally and remotely.
The management interface of an MBean would typically
consist of [1]

  • Named and typed attributes that can be read/ written
  • Named and typed operations that can be invoked
  • Typed notifications that can be emitted by the MBean

For example say it is required to manage a thread pool parameters of one of your
applications at runtime. With JMX it’s a matter of writing a MBean with logic
related to setting and getting these parameters and registering it to the

Now the next step is to expose these mbeans to the outside world so that remote
clients can invoke these MBeans to manage your application. It can be done via
various protocols implemented via protocol connectors and protocol adapters.
A protocol connector basically expose MBeans as they are so that remote client
sees the same interface (JMX RMI Connector is a good example). So basically the
client or the remote management application should be enabled for JMX

A protocol adapter (e.g: HTML, SNMP) adapt the results according to the protocol
the client is expecting (e.g: for a browser-based client sends the results in
HTML over HTTP).

Now that MBeans are properly exposed to the outside we need some clients to
access these MBeans to manage our applications. There are basically two
categories of clients available according to whether they use connectors or

JMX Clients use JMX APIs to connect to MBeanServer and invoke MBeans. Generally
JMX Clients use a MBeanServerConnection to connect to the MBeanServer and invoke
MBeans through it by providing the MBean ID (ObjectName) and required
parameters. There are basically three types of JMX Clients.

Local JMX Client : A client that runs in the same JVM as the MBeanServer.
These clients can also use MBeanServer API itself since they are running inside
the same JVM.

Agent : The agent is a local JMX Client which manages the MBeanServer itself.
Remember that MBeanServer does not persist MBean information. So we can use
an Agent to provide this logic which would encapsulate the MBeanServer with
the additional functionality. So the Agent is responsible for initializing
and managing the MBeanServer itself.

Remote JMX Client : Remote client is only different from that of a local

client in that it needs to instantiate a Connector for connecting to a Connector
server in order to get a MBeanServerConnection. And of course they would be
running in a remote JVM as the name suggests.

Next type of client is the Management Clients which use protocol adapters to
connect to MBeanServer. For these to work the respective adapter should be
present and running in the JVM being managed. For example HTML adapter should be
present in the JVM for a browser-based client to connect to it invoke MBeans.
The diagram below summarizes the concepts described so far.



This concludes my quick notes on JMX. An extremely good read on main JMX
concepts can be found at [2]. Also JMX learning trail at Oracle is a good
starting point for getting good with JMX.
[1] http://docs.oracle.com/javase/6/docs/technotes/guides/jmx/overview/instrumentation.html#wp998816
[2] http://pub.admc.com/howtos/jmx/architecture-chapt.html

Read Full Post »

Recently I wanted to set up a remote desktop sharing session from home pc to my laptop. While going through the set up guide I came across ssh tunneling. Even though there are many articles on the subject still it took me a considerable amount of googling, some experimenting and couple of Wireshark sessions to grasp what’s going under the hood. Most of the guides were incomplete in terms of explaining the concept which left me desiring for a good article on the subject with some explanatory illustrations. So I decided to write it my self. So here goes…


A SSH tunnel consists of an encrypted tunnel created through a SSH protocol
connection. A SSH tunnel can be used to transfer unencrypted traffic over a
network through an encrypted channel. For example we can use a ssh tunnel to
securely transfer files between a FTP server and a client even though the FTP
protocol itself is not encrypted. SSH tunnels also provide a means to bypass firewalls that prohibits or filter certain internet services. For example an organization will block certain sites using their proxy filter. But users may not wish to have their web traffic
monitored or blocked by the organization proxy filter. If users can connect to
an external SSH server, they can create a SSH tunnel to forward a given port on
their local machine to port 80 on remote web-server via the external SSH
server. I will describe this scenario in detail in a little while.

To set up a SSH tunnel a given port of one machine needs to be forwarded (of
which I am going to talk about in a little while) to a port in the other
machine which will be the other end of the tunnel. Once the SSH tunnel has been
established, the user can connect to earlier specified port at first machine to
access the network service.

Port Forwarding

SSH tunnels can be created in several ways using different kinds of port forwarding
mechanisms. Ports can be forwarded in three ways.

  1. Local port forwarding
  2. Remote port forwarding
  3. Dynamic port forwarding

I didn’t explain what port forwarding is. I found Wikipedia’s definition more explanatory.

Port forwarding or port mapping is a name given to the combined technique of

  1. translating the address and/or port number of a packet to a new destination
  2. possibly accepting such packet(s) in a packet filter(firewall)
  3. forwarding the packet according to the routing table.

Here the first technique will be used in creating an SSH tunnel. When a client application connects to the local port (local endpoint) of the SSH tunnel and transfer data these data will be forwarded to the remote end by translating the host and port values to that of the remote end of the channel.

So with that let’s see how SSH tunnels can be created using forwarded ports with an examples.

Tunnelling with Local port forwarding

Let’s say that yahoo.com is being blocked using a proxy filter in the University.
(For the sake of this example. :). Cannot think any valid reason why yahoo would be blocked). A SSH tunnel can be used to bypass this restriction. Let’s name my machine at the university as ‘work’ and my home machine as ‘home’. ‘home’ needs to have a public IP for this to work. And I am running a SSH server on my home machine. Following diagram illustrates the scenario.

To create the SSH tunnel execute following from ‘work’ machine.

ssh -L 9001:yahoo.com:80 home

The ‘L’ switch indicates that a local port forward is need to be created. The switch syntax is as follows.

-L <local-port-to-listen>:<remote-host>:<remote-port>

Now the SSH client at ‘work’ will connect to SSH server running at ‘home’ (usually running at port 22) binding port 9001 of ‘work’ to listen for local requests thus creating a SSH tunnel between ‘home’ and ‘work’. At the ‘home’ end it will create a connection to ‘yahoo.com’ at port 80. So ‘work’ doesn’t need to know how to connect to yahoo.com. Only ‘home’ needs to worry about that. The channel between ‘work’ and ‘home’ will be encrypted while the connection between ‘home’ and ‘yahoo.com’ will be unencrypted.

Now it is possible to browse yahoo.com by visiting http://localhost:9001 in the web browser at ‘work’ computer. The ‘home’ computer will act as a gateway which would accept requests from ‘work’ machine and fetch data and tunnelling it back. So the syntax of the full command would be as follows.

ssh -L <local-port-to-listen>:<remote-host>:<remote-port> <gateway>

The image below describes the scenario.

Here the ‘host’ to ‘yahoo.com’ connection is only made when browser makes the
request not at the tunnel setup time.

It is also possible to specify a port in the ‘home’ computer itself instead of
connecting to an external host. This is useful if I were to set up a VNC session
between ‘work’ and ‘home’. Then the command line would be as follows.

ssh -L 5900:localhost:5900 home (Executed from 'work')

So here what does localhost refer to? Is it the ‘work’ since the command line is executed from ‘work’? Turns out that it is not. As explained earlier is relative to the gateway (‘home’ in this case) , not the machine from where the tunnel is initiated. So this will make a connection to port 5900 of the ‘home’ computer where the VNC client would be listening in.

The created tunnel can be used to transfer all kinds of data not limited to web browsing sessions. We can also tunnel SSH sessions from this as well. Let’s assume there is another computer (‘banned’) to which we need to SSH from within University but the SSH access is being blocked. It is possible to tunnel a SSH session to this host using a local port forward. The setup would look like this.

As can be seen now the transferred data between ‘work’ and ‘banned’ are encrypted end to end. For this we need to create a local port forward as follows.

ssh -L 9001:banned:22 home

Now we need to create a SSH session to local port 9001 from where the session
will get tunneled to ‘banned’ via ‘home’ computer.

ssh -p 9001 localhost

With that let’s move on to next type of SSH tunnelling method, reverse tunnelling.

Reverse Tunnelling with remote port forwarding

Let’s say it is required to connect to an internal university website from home.
The university firewall is blocking all incoming traffic. How can we connect from ‘home’ to internal network so that we can browse the internal site? A VPN setup is a good candidate here. However for this example let’s assume we don’t have this facility. Enter SSH reverse tunnelling..

As in the earlier case we will initiate the tunnel from ‘work’ computer behind the firewall. This is possible since only incoming traffic is blocking and outgoing traffic is allowed. However instead of the earlier case the client will now be at the ‘home’ computer. Instead of -L option we now define -R which specifies
a reverse tunnel need to be created.

ssh -R 9001:intra-site.com:80 home (Executed from 'work')

Once executed the SSH client at ‘work’ will connect to SSH server running at home creating a SSH channel. Then the server will bind port 9001 on ‘home’ machine to listen for incoming requests which would subsequently be routed through the created SSH channel between ‘home’ and ‘work’. Now it’s possible to browse the internal site
by visiting http://localhost:9001 in ‘home’ web browser. The ‘work’ will then create a connection to intra-site and relay back the response to ‘home’ via the created SSH channel.

As nice all of these would be still you need to create another tunnel if you need to connect to another site in both cases. Wouldn’t it be nice if it is possible to proxy traffic to any site using the SSH channel created? That’s what dynamic port forwarding is all about.

Dynamic Port Forwarding

Dynamic port forwarding allows to configure one local port for tunnelling data to all remote destinations. However to utilize this the client application connecting to local port should send their traffic using the SOCKS protocol. At the client side of the tunnel a SOCKS proxy would be created and the application (eg. browser) uses the SOCKS protocol to specify where the traffic should be sent when it leaves the other end of the ssh tunnel.

ssh -D 9001 home (Executed from 'work')

Here SSH will create a SOCKS proxy listening in for connections at local port
9001 and upon receiving a request would route the traffic via SSH channel
created between ‘work’ and ‘home’. For this it is required to configure the
browser to point to the SOCKS proxy at port 9001 at localhost.

Read Full Post »

Nowadays we are constantly reminded of the virtues of being proactive or more

colloquially put “Being one step ahead of the game” when it comes to handling

our businesses whether it be a SME or a multi-national cooperation. Quickly

detecting or in some cases even predicting, trends in activities originating

within and outside the organization and streamlining business activities

accordingly may decide between death or life, of the business it’s said. The

often touted solution for this problem is implementing a proper monitoring

solution which would give the decision makers relevant information at correct

time. However most businesses are at a loss where to begin or how to properly

implement means of obtaining such insights. This is not surprising given that even

the buzzwords surrounding the monitoring concepts tend to be fuzzy.


Whoa.. That’s some pretty serious language (OK it is, at least to me :). I

consider my self linguistically challenged when it comes to English.). Well I

wanted to start with a serious note since we are dealing with a serious subject

here right??. :). Anyway this says a part of the story when it comes to

business monitoring domain. Sometimes the monitoring solutions forced on

businesses are just like this. Some serious mumbo jumbo with hundreds of bells

and whistles which most of us don’t care to understand. And of course some

times not capturing what really needs to be monitored in the business as well.

On top of that there is a buzz word soup surrounding the monitoring products

which each vendor come up with different interpretations according to their

implementations. Anyway let’s get some perspective on some business monitoring

key words according to the way I see it.


Let’s monitor some activities


“Business Activity Monitoring” is a term coined by Gartner Inc. which is

defined as the “The aggregation, analysis and presentation of real-time

information about activities inside organizations and involving customers and

partners”. However it can be seen the term is used in different contexts

meaning different things to different people specially when it comes vendor

solutions. The confusion tends be mostly around the fact on what can be

considered a business activity. For example for a business executive a sale of

a product will be a perfectly valid business activity which need to be

monitored while for tech op guy would need monitoring on the load of the server

hosting the sales application. I have heard some people say the latter does not

really falls under the term “Business Activity” since that level of monitoring

is of no importance to strategic decision-making of the business. But as far as

I believe it is no less important and should be a part of a comprehensive

monitoring solution since any high level decisions made would depend on the

smooth functioning of daily operations supported by a proper functioning

infrastructure (If servers are out sales numbers are going to get hurt. So will

the sales projections. Simple as that). It’s a matter of providing a suitable

view to each intended user group according to the type of monitoring

information they are interested in.


Anyway latter kind of monitoring may better fit under “Operational Intelligence”

category of which I will be talking about in a bit. In that sense we can think of

“Business Activity Monitoring” as a subset of “Business Monitoring” so that

this fulfills a part of the holistic view on approaching the monitoring problem

where all of what needs to be monitored in the business would come under

a comprehensive monitoring solution. This is one major point where the

vendors differ in their solutions. Some monitoring solutions focus on

a mixture of monitoring aspects and so their definition of BAM varies



BPM – A side kick??


Another difference between various BAM solutions is in the way they are

implemented. Some assume the presence of an existence of a Business Process

Management(BPM) solution, mostly from the same vendor and so the monitoring

solution is tightly coupled to that. While these kinds of solutions may provide

better integration in terms of the products in my opinion they lack the

flexibility to monitor most business scenarios where no business process

management solutions are in place. If the monitoring framework is generic

enough it’s a matter of putting required data capturing agents at points of

interest to capture and send data to the BAM solution which should be able to

correlate events from incoming events. However if there is a BPM solution

already present from the same vendor it should also be able to leverage that as

well. This way it would provide most flexibility in terms of monitoring



Key to success – KPI


Another term mentioned side by side with BAM is key performance

indicators(KPI). A BAM solution would monitor a set of predefined KPIs and make

sure that necessary actions are taken (it may be firing some alerts to relevant

parties or even automatically triggering some corrective action if possible)

when KPIs are not met with respect to their desired values. A good definition that

I found on what constitute a KPI is as follows.

Key Performance Indicators are quantifiable measurements that reflect the critical success factors of an organization. They will differ depending on the organization

So these are highly specific to the organization. Let me give a couple of simple examples on KPIs.

  1. For a retail store a valid KPI would be the percentage of days where daily sales revenue target was not met.
  2. For a delivery service a KPI would monitor the number of deliveries that went 10% overtime than their expected delivery times.
  3. A KPI for a call center would monitor the number of calls which took less than 2 minutes to resolve the problem.

Here we can identify the importance of the ability to customize the KPI

definitions according to the nature of the business. While properly identifying

the necessary KPIs should be done with involvement of the business management,

the BAM solution should facilitate defining business specific KPI definitions.


Intelligence in Operations – OI


Next comes the “Operational Intelligence” aspect of the business monitoring. It

is more or less similar to “Business Activity Monitoring” except that

“Operational Intelligence” is more oriented towards monitoring day today

business activities and geared to find issues in the system in real-time in

order for taking corrective actions. I believe technical operations monitoring

fits under this description since it involves the day-to-day aspect and the

required response times for any found issue should be more real-time. But

business matrices requiring close monitoring may well be included as part of

“Operational Intelligence” aspects as well. So here comes another word (“Real

time”) in to the mix which means different things to different people. There

are several levels of real-timeness as per products we see in the market. Some

position them as real-time monitoring solutions while others support near real

time monitoring and the boundary between these are blurry at best. As with any

thing else when it comes to monitoring, the required response time of the

solution depends on the context. A solution monitoring a business critical

application server may require response times within several seconds while a

low volume internal application server may not need such real-time monitoring.

A good rule of thumb should be that if it’s real-time expect a sub minute

response time while if it’s near real-time a couple of minutes lag at times may

be acceptable. Of course the vendors can stretch these either way according to

their implementations. So always try to read between the lines of marketing

terms to really see whether the solution a vendor is proposing really matches

what is required.


CEP to the rescue


Often the response times required by “Operational Intelligence” monitoring

necessitates the usage of a Complex Event Processing(CEP) solution underneath

which would monitor incoming event streams upon entry and trigger certain

actions when anomalies are detected. So the real-timeness of the product will

directly depend upon the performance characteristics and scalability of the CEP

solution used underneath.


Another type of Intelligence – BI


Next type of “Intelligence” a business want is “Business Intelligence”. Yeah I

know there are so many types of “Intelligences” floating around and this is one

of the important ones. This is geared towards finding trends in business

operations and market environment and coming up with predictions on the

business conditions. This is basically a historical data analysis which may

pull out data from a data ware house do some ETL operations and run some data

mining operations on data to gain new insights on business operations. So these

jobs are not real-time rather batch jobs which are scheduled at suitable



Ok. I think that’s enough for a day. Hope I made some sense out of the

monitoring buzz word fiesta. Hopefully this post would be good base for a next

post I plan to write some time soon in which I would outline some practical

experiences me and our team had while implementing a business monitoring
solution ourselves.

Read Full Post »

I have been reading on Join implementations available for Hadoop for past few days. In this post I recap some techniques I learnt during the process. The joins can be done at both Map side and Join side according to the nature of data sets of to be joined.

Reduce Side Join

Let’s take the following tables containing employee and department data.

Let’s see how join query below can be achieved using reduce side join.

SELECT Employees.Name, Employees.Age, Department.Name  FROM Employees INNER JOIN Department ON Employees.Dept_Id=Department.Dept_Id

Map side is responsible for emitting the join predicate values along with the corresponding record from each table so that records having same department id in both tables will end up at on same reducer which would then do the joining of records having same department id. However it is also required to tag the each record to indicate from which table the record originated so that joining happens between records of two tables. Following diagram illustrates the reduce side join process.

Here is the pseudo code for map function for this scenario.

map (K table, V rec) {

   dept_id = rec.Dept_Id

   tagged_rec.tag = table

   tagged_rec.rec = rec

   emit(dept_id, tagged_rec)


At reduce side join happens within records having different tags.

reduce (K dept_id, list<tagged_rec> tagged_recs)  {

   for (tagged_rec : tagged_recs) {

      for (tagged_rec1 : taagged_recs) {

          if (tagged_rec.tag != tagged_rec1.tag) {

              joined_rec = join(tagged_rec, tagged_rec1)

       emit (tagged_rec.rec.Dept_Id, joined_rec)



Map Side Join (Replicated Join)

Using Distributed Cache on Smaller Table

For this implementation to work one relation has to fit in to memory. The smaller table is replicated to each node and loaded to the memory. The join happens at map side without reducer involvement which significantly speeds up the process since this avoids shuffling all data across the network even-though most of the records not matching are later dropped. Smaller table can be populated to a hash-table so look-up by Dept_Id can be done. The pseudo code is outlined below.

map (K table, V rec) {

list recs = lookup(rec.Dept_Id) // Get smaller table records having this Dept_Id

for (small_table_rec : recs) {

joined_rec = join (small_table_rec, rec)


emit (rec.Dept_id, joined_rec)


Using Distributed Cache on Filtered Table

If the smaller table doesn’t fit the memory it may be possible to prune the contents of it if  filtering expression has been specified in the query. Consider following query.

SELECT Employees.Name, Employees.Age, Department.Name  FROM Employees INNER JOIN Department ON Employees.Dept_Id=Department.Dept_Id WHERE Department.Name="Eng"

Here a smaller data set can be derived from Department table by filtering out records having department names other than “Eng”. Now it may be possible to do replicated map side join with this smaller data set.

Replicated Semi-Join

Reduce Side Join with Map Side Filtering

Even of the filtered data of small table doesn’t fit in to the memory it may be possible to include just the Dept_Id s of filtered records in the replicated data set. Then at map side this cache can be used to filter out records which would be sent over to reduce side thus reducing the amount of data moved between the mappers and reducers.

The map side logic would look as follows.

map (K table, V rec) {

   // Check if this record needs to be sent to reducer
   boolean sendToReducer = check_cache(rec.Dept_Id)
   if (sendToReducer) {
      dept_id = rec.Dept_Id

      tagged_rec.tag = table

      tagged_rec.rec = rec

      emit(dept_id, tagged_rec)

Reducer side logic would be same as the Reduce Side Join case.

Using a Bloom Filter

A bloom filter is a construct which can be used to test the containment of a given element in a set. A smaller representation of filtered Dept_ids can be derived if Dept_Id values can be augmented in to a bloom filter. Then this bloom filter can be replicated to each node. At the map side for each record fetched from the smaller table the bloom filter can be used to check whether the Dept_Id in the record is present in the bloom filter and only if so to emit that particular record to reduce side. Since a bloom filter is guaranteed not to provide false negatives the result would be accurate.


[1] Hadoop In Action

[2] Hadoop : The Definitive Guide

Read Full Post »

Cassandra pagination has been the subject of several blogs elsewhere as well. In particular this excellent blog by Michael Kopp details how this can be generally handled using Cassandra API. We also had a our own use case where we needed paginated access. However for our use case scheme outlined by Micheal presents several shortcomings.

1. What if we want to fetch rows batch wise instead of columns?

2. If there are updates during the paged retrieval there is a chance that some items will be missed out. For example let’s say the last access is at column with column key with “Florence”. So the next retrieval would fetch a batch starting from “Florence” on wards. What if a column with key “Cologne” has been newly added? It would not get included in any of the future retrieval.

3. Also there may be a use case where it is required paginate the results obtained by filtering with a range query rather than fetching all the rows page wise in the column family. (Actually this was our use case)


So let’s have a look at how we took a stab at the beast, Cassandra pagination. Before that let me explain our use case fully so that it’s easier to grasp what we did and why we did it. Our use main case was to paginate the access to results returned from a range query which can cleanly expressed in SQL lingo as follows.


SELECT * FROM <column_family> WHERE <column_name_1> BETWEEN [from_1] AND [to_1] AND <column_name_2> BETWEEN [from_2] AND [to_2] .... AND <column_name_n> BETWEEN <from_n> AND <to_n>


Here each column_name is an index. (Actually a sub index of a composite index. For a description on our indexing scheme refer to my earlier blog Cassandra: Lessons Learnt) . So our use case is bit complicated in that it’s required to paginate the access of the result set obtained from a range query. Also our requirement was to fetch all the rows satisfying this criteria without missing any row provided that there would be new additions while we are retrieving rows in batches. In fact there may be a considerable time-lapse between two fetches since the retrieved data are processed using a scheduled task with configurable interval in our use case.  Additionally we had to leave the room for non batched access of the range query result as well. And of course we were not using the OrderedPartitioner. (Evils of OrderedPartitioner is well documented elsewhere. Sub optimal loadbalancing, creating hot spots etc.. ). Had we used OrderedPartitioner our life would have been bit easier since we would have been able to do a range query on the rows. But since we were using RandomPartitioner no ordering of rows using row keys can be assumed as well.


Ok that’s enough for the predicament that we were in couple of months back while faced with the task of ‘Cassandrafication’ our data layer. Hope you got the idea.. Now let’s see what we did to improve the situation.


First we had to deal with our inability to do range query on rows. Cassandra has this nice caveat, that columns of a particular row is always sorted using the column keys. So we utilized this nicety to impose an ordering on rows. We always maintain a meta row in which all the row keys are stored as columns. (Actually a row key is a column key and column value is empty). Let’s say this row is ‘RowIndex’. (See figure 1). Now when doing a query on column family we first query this row using a range query and get the rows matching the criteria and then do the real row fetching one by one using the row keys fetched. You might be wondering how the range query is constructed to match the where clauses in the given SQL above. In our scheme the row key is constituted from concatenating the value for each index. (Index is in fact a column in a particular row and we use the column value as the index value. This will become clearer by having a look at the first step of illustration given in figure 2). So this is the scheme we used for non batched retrieval of rows satisfying a particular query.


Figure 1 : Column family with meta row ‘RowIndex’


But for paginated use case this proved to be insufficient due to the second shortcoming outlined earlier. We realized that there needs to be an ordering from the timestamp to catch a newly added row even if its row key put it in a location in sorted order which is before the last accessed row key. So we introduced another meta row storing the timestamp of insertion of each row. Let’s say this row key of this meta row is ‘TimeStampIndex’. Each column of this row will hold the insertion timestamp as the column key and the corresponding row key of the row inserted at that particular timestamp as the column value. So now we need to do four things we add a row to the column family.


Figure 2 : Row insertion algorithm

1. Create the row key using the defined indexes. Here we use ‘server’ and ‘time’ as the indexes.

2. Insert row key in to the ‘RowIndex’ as a column.

3. Insert the row insertion timestamp along with row key as a column to the ‘TimeStampIndex’

4. Add the row itself to the column family.


‘RowIndex’ is to be used for non batched access of the range query result while ‘TimeStampIndex’ is to be used for batched access of the range query result.


Now when we want to fetch the rows in batches satisfying the range query criteria, first we get a batch size chunk of timestamps from ‘TimeStampIndex’. Then for each and every row associated with the timestamp we check whether if the row matches the filter criteria. This is a simple string comparison to check whether the row key falls between the range first and range last values.


Say for example the filter criteria for above illustration is following where clause.

WHERE 'server' BETWEEN 'esb' and 'esb' and 'hour' BETWEEN '08:00' and '09:00'

Now the range first value of the query would be ‘esb—08:00’ and the range last value would be ‘esb—09:00’. This will select events for server ‘esb’ during the hours from ’08:00′ to ’09:00′.  So if the row key is ‘esb—08:23’ it will get picked and if it is ‘esb—09:23’ it won’t.


So as can be seen for this scenario we didn’t use ‘RowIndex’ meta row. It’s for non batched use only. And in this way using ‘TimeStampIndex’ we can catch newly added rows without missing out on any row.


However it’s not without its own drawbacks.

1. The batch size is not consistent. Even though the batch size chunk is picked from the query some of these rows will be discarded since they do not match the filtering criteria. Solution would be to get multiple batches until the batch size number of rows fulfilling the filter criteria is found. But for now we are ok with inconsistent batch sizes.

2. What if an existing row is updated? It will get fetched a second time since the algorithm will not miss any newly added or updated row. This may or may not be desirable according to the use case. For us this is in fact the needed behavior since we need any new updates to an already fetched row. So we are ok with that too.


So that concludes our escapade with Cassandra pagination. The (love) story continues.. (Hope you saw the sarcasm sign unlike Sheldon.. :))

Read Full Post »

After working with Cassandra for couple of months I thought of jotting down my Cassandra experience including what we did, what I liked and not so liked. But I need to put a little disclaimer up front. We are currently using Cassandra 0.7  (“What !#$*?? Have you been living under a rock? Get your self a life and upgrade your darn installation” you would say. I know. I know.. :). We are working on it.). So some of the stuff mentioned here would not relate to later versions although the concepts we used are still mostly applicable. (I think kudos should go to Cassandra people for churning out several quick releases with some cool new features.)

That being said let me go through what we have been doing with Cassandra. Our use case is roughly as follows. We are using Cassandra as the persistence layer of our Business Activity Monitoring solution at WSO2 which is rewritten to scale better (About which I am hoping to blog some where in the line of “A scalable big data ready BAM solution – WSO2 BAM” some time soon.). The data are pumped from servers being monitored as events consisting of key value pairs and get written to Cassandra store after some minimal required processing at the receiver.

Figure 1 : Event format

These data are then indexed to facilitate the data analysis. Here our use case differs from normal Cassandra use cases where the queries are known in advance and we start from query and build our Cassandra data model. Since what we are building is a generic data analytic framework we don’t know what queries users want to perform on their stored data in Cassandra. But then we thought if we can build set of indexes up front according the queries to be done it would solve our dilemma. So we decided to give users the ability to define what indexes to be built using an XML configuration which is roughly as follows.

<index name="serverByTime">
   <part name="server"/>
   <part name="time"/>

Here we are building a composite index which can serve queries such as “Give me the events that came from server ‘esb12′ during the time period ’08:00’ to ’09:00′”. The indexing happens as described below.

Figure 2 : Indexing Model

1. Read an event from primary column family. (Basically a row from primary column family)

2. If event contain server and time keys get the corresponding values of these keys and concatenate them to create row key of index column family (In this case serverByTime).

3. Create index column family if not present. Add a new row having created row key if not already existing in the index column family and add column containing event id to it.

4. Add newly created row key as a column in the index row of index column family.

So as can be seen we are using a column family per index. Cassandra doesn’t sort rows on row keys if you are using RandomPartitioner which is the recommended partitioner for better performance. But columns within rows are always sorted using column key. We use this property to store index so that it can be queried using range queries. Each column family has a special row called “index row” which stores all the row keys of the column family.  When it is required to query the index, a range query on this row is done first to fetch the required row keys. Let’s say that we wanted to do a query to fetch “all the events from servers esb12 to esb15 during the time periods “08:00 to 09:00”. Let’s see how this is done using our indexing model.

1. Get the range values for required sub indexes. In this case “esb12” to “esb15” for server and “08:00” to “09:00” for time.

2. Create the range first and last values by concatenating values of sub indexes. The range first value would be “esb12—08:00” and the range last would be “esb15—09:00”. Here ‘—‘ is the constant delimiter we are using during the concatenation of row key parts.

3. Do the range query on index row and get the row keys in the range.

4. Fetch each row to using row key to get the result. Actually what we are getting here is set of event ids. To get actual event details another look up in primary column family using the event id is required.

Here using this model we are able to do range queries on multiple indexes using this method. One advantage of this over native secondary index facility present from version 0.8 onward is that the first index query isn’t restricted to an equality operation (e.g. server equals ‘esb12’ etc.). But the order of sub indexes making up of composite index (e.g: row key part order. In this case server—time) determines the types of queries which can be made. For instance we cannot query just using time. For that we would need to create another composite index starting with time as the first sub index.

We also had the requirement of fetching rows in batches from column families with each row being fetched exactly once to be processed. We achieved this using another special row having all the row insertion times to order rows according to the time of storage and then fetching rows in batches by doing range queries on this time stamp row with batch size limit.

All in all it’s rather involved scheme to get what we wanted out of Cassandra. Not a big selling point I would say. Also concatenating and creating row keys seemed little hackish to me. Also we had to come up with some tricks to make batching work with range queries. May be it’s not that strange Cassandra is also fondly known as “Hackendra” among some of us (has got kind of nice ring to it I feel :)) since sometimes it’s hard not to have that pesky feeling “Am I doing a hack here???” at the back of your mind when working with Cassandra. But by no means it means Cassandra is evil. We like Cassandra for its fast writes and scalable architecture. But in real life when it comes to relationships not every thing is perfect. But it doesn’t mean we don’t care. Our relationship with Cassandra is no different. :).

Read Full Post »

Let’s see how we can model some regularly used SQL queries using map reduce.

  • select … from … where …
Take following example
select f1, f2 from relation where f1 > 500

For this example let’s assume a suitable InputFormat (in case of Hadoop) does the reading from the database and emit key value pairs in the form (k, rec) where k is primary key and rec is the entire record. Pseudo code using map reduce is given below.

map (k, rec)  {
   if (rec.f1 > 500) {
      rec1 = <rec.f1, rec.f2>
      collect (k , rec1)

As can be seen this is implemented using a map function. Output will be emitted only if predicate is satisfied.

  • select aggregate_func() from … where … groupby …
Let’s take the following example.
select f3, sum(f1), avg(f2) from relation where f1 > 500 groupby f3
The pseudo-code below describes how this is achieved using map reduce.
map (k, rec)  {
   if (rec.f1 > 500) {
      rec1 = <rec.f1, rec.f2, rec.f3>
      collect (rec.f3 , rec1)
reduce(v, list<rec1>) {
   sum := 0
   avg := 0
   for each rec1 in list {
      sum += rec1.f1
      avg += rec1.f2
   avg := avg / size(list)
   rec2 = <rec1.f3, sum, avg>
   store (v, rec2)

Here each v that reduce gets corresponds to a unique value in rec1.f3 field. Group by is implicitly done using the shuffling phase between map and reduce functions.

  • select aggregate_func() from … where … groupby … having …

Here additional having clause is used to filter out grouped results. Let’s take an extended version of earlier example.

select f3, sum(f1), avg(f2) from relation where f1 > 500 groupby f3 having avg(f2) > 50

No change is required in the map function. The modified reduce function is as below.

reduce(v, list<rec1>) {
   sum := 0
   avg := 0
   for each rec1 in list {
      sum += rec1.f1
      avg += rec1.f2
   avg := avg / size(list)
   rec2 = <rec1.f3, sum, avg>
   if (avg > 50) {
      store (v, rec2)

Read Full Post »

I was reading up on JMS of late. The more I read the more I got confused about the usage of terms persistent and durable in the context of JMS. Then I found this blog which cleared my issues. Thanks Nigel for the great post. :). Anyway here is the gist of that blog I committed to my memory.

  • Persistent apply to messages while durable apply to subscriptions.
  • Queues don’t have durable/ non-durable distinction because consumer always gets a durable subscription.
  • For queues:
  1. Persistent : Message will be saved on disk and sent later if consumer is inactive.
  2. Non-persistent : Messages will be saved in-memory and sent later if consumer is inactive. But they will not survive a broker re-start.
  • For topics:
  1. Persistent & Durable : Message will be saved both on-disk and in-memory and sent later if subscriber is inactive.
  2. Persistent  & Non-Durable : Message will not be saved either in-memory or on disk and any inactive subscriber at the moment of message receipt at broker will not receive the message.
  3. Non-persistent & Durable : Message will be saved in-memory and sent later if subscriber is inactive. So it will not survive a broker re-start.
  4. Non-persistent & Non-Durable : Message will not be saved either in-memory or on disk and any inactive subscriber at the moment of message receipt at broker will not receive the message. Similar to Persistent & Non-Durable case.
  • In all these cases message is sent immediately if subscriber is active.

Read Full Post »

When I was first got introduced to the free and open source software concept not so long ago the first impression I got about this concept was that the software produced ought to be freely distributed with their sources publicly available so that they come free of charge. Immediate question followed on my mind, as is the usual case for others being introduced to the concept as well, was, :”Well how are the guys in open source earn money?”.

Then I was told “Well people can provide support services for the users of software by means of customer service, training, consulting etc. and earn money from it.” So that settled the score for me of the open source business model though I was skeptic about the amount of returns companies would gain without the bulky initial earnings that would be had for the software itself as is the case for proprietary software. Anyway I didn’t give much thought about it afterwards though I got involved in some open source software projects. That was until I stumbled upon this lecture by Richard Stallman, the founder of Free Software movement. Only then I realized that this concept was based on a set profound philosophical ideologies and also the term itself in not immune to controversy.

Anyway here is what I learned from him on that lecture.

Moral Dilemma

In his talk he describes about freedoms that he thinks that there should be present in the software usage for it to be conducive in building a “community of sharing and cooperation” to be built around it without the evils of the “moral dilemma” that users of proprietary software face. The “moral dilemma” he describes, “Think that you installed a copy righted software which you bought recently on your machine. So you wanted to show it off to your friend and he says that it is way too cool for you to have it alone after using it himself on your machine and he asks a copy of it for his usage. Right then you are being pushed in to a dilemma. You are tortured by the conflicting moral evils of not being able to help out a friend and being selfish if you reject his request based on the software copy rights, and on the other side violating the copy rights of the software if you were to give it to him, making you what they call a “pirate”.”

Four Freedoms of software

“Saving software users from this dilemma”, he says is what made him start the GNU project. So he goes on to say that he believes that there should be four freedoms that any software should offer to its users in order to serve the means of achieving this goal.

Freedom zero:The freedom to run the program as you wish.

The user has the choice of running software to fulfill his purposes, not any purpose pushed on to him by the developer or any other for that matter. So he should be able to setup and run the program the way he wants.

Freedom one: The freedom to study the source code and change it so that it does what you wish

This empowers the user to make the software work in a way which is optimally suited for his requirements. Even if the user is not a programmer he can still hire a knowledgeable professional to do the job. Nobody will object that since this freedom is granted with the software itself. And another alternative would be to let his requirements be known to the software developer community and if the community find it useful to a broader user base and the project itself then they will add or modify the features of the software according the request. This is achievable partly due to the below mentioned Freedom Three which allows software modifications without restrictions.

Freedom two:The freedom to help your neighbor.

You should be able to sell or give free of charge an exact copy of the software to any one requiring it. This saves the software user from the above “moral dilemma”. Another point worth noting is that in this regard it seems that he doesn’t object to making money out of selling the software itself. But the common sense implies that this would not be practically feasible since it will not hinder buyers of your software from giving away free copies of the software you are selling, bringing the price levels to zero. This I thought cleared up some misconception I had about the free software about the fact that they are called free because they are zero cost. In fact I realise now there are zero cost software that are not free according to this definition the best examples being Internet Explorer and early Netscape Navigator browsers. Both of them were free of charge but the source codes were not available to public so users had to depend on respective companies to do bug fixes and enhancements. The ambiguity of the word “Free” led the people to introduce a new phrase to describe the concept of free software as “Free as in free speech not in free beer”. So it was a matter of liberty, not price. The zero cost software is just a coincidental side effect.

Freedom three:The freedom to contribute to your community.

You should be able to do some modifications and be able to contribute it back to the community. This keeps the spirit of sharing alive and allowing the non programmer and programmer software users to reap the benefits of others works.

Open Source

The officially coined term “Free Software” didn’t gain much popularity with the cooperate world due to the fact that “Free” was associated with lower grade, cheap or not being ethical. So the term “Open Source” was introduced describing another facet of the Free software. The marketing gimmick paid off big time and the cooperate acceptance levels of open source software is now much higher now than they used to be which Apache Web Server bears fine enough testimony.

A matter of perspective

No doubt this is just a one person’s perspective of what software should be and is not immune to criticism of other ideologies. It seems there are issues among different fractions of open source software developers about what open source software should be and is partly a matter of personal judgment. Any how the lecture left me with much broader perspective of what Free Software is.


Read Full Post »