Feeds:
Posts
Comments

I/O Demystified

With all the hype on highly scalable server design and the rage behind nodejs I
have been meaning to do some focused reading on IO design patterns to which
until now couldn’t find enough time to invest. Now having done some research I
thought it’s best to jot down stuff I came across as a future reference for me
and any one who may come across this post. OK then.. Let’s hop on the I/O bus
and go for a ride.

Types of I/O

There are four different ways IO can be done according to the blocking or non
blocking nature of the operations and the synchronous or asynchronous nature of
IO readiness/completion event notifications.

Synchronous Blocking I/O

This is where the IO operation blocks the application until its completion
which forms the basis of typical thread per connection model found in most web servers.

When the blocking read() or write() is called there will be a context switch to
the kernel where the IO operation would happen and data would be copied to a
kernel buffer. Afterwards, the kernel buffer will be transfered to user space
application level buffer and the application thread will be marked as runnable
upon which the application would unblock and read the data in user space buffer

Thread per connection model tries to limit the effect of this blocking by confining a
connection to a thread so that handling of other concurrent connections will not
be blocked by an I/O operation on one connection. This is fine as long as the
connections are short lived and data link latencies are not that bad. However in
the case of long lived or high latency connections the chances are that threads
will be held up by these connections for a long time causing starvation for new
connections if a fixed size thread pool is used since blocked threads cannot be
reused to service new connections while in the state of being blocked or else it
will cause a large number of threads to be spawned within the system if each
connection is serviced using a new thread, which can become pretty resource
intensive with high context switching costs for a highly concurrent load.


  ServerSocket server = new ServerSocket(port);
  while(true) {
    Socket connection = server.accept();
    spawn-Thread-and-process(connection);
  }

 Simple thread per connection server

Synchronous Non Blocking I/O

In this mode the device or the connection is configured as non blocking so that
read() and write() operations will not be blocked. This usually means if the
operation cannot be immediately satisfied it would return with an error code
indicating that the operation would block (EWOULDBLOCK in POSIX) or the device
is temporarily unavailable (EAGAIN in POSIX). It is up to the application to
poll until the device is ready and all the data are read. However this is not
very efficient since each of these calls would cause a context switch to kernel
and back irrespective of whether some data was read or not.

Asynchronous Non Blocking I/O with Readiness Events

The problem with the earlier mode was that the application had to
poll and busy wait to get the job done. Wouldn’t it be better that some how the
application was notified when the device is ready to be read/ written? That is what
exactly this mode provides you with. Using a special system call (varies
according to the platform – select()/poll()/epoll() for Linux,
kqueue() for BSD, /dev/poll for Solaris) the application registers the interest
of getting I/O readiness information for a certain I/O operation (read or write)
from a certain device (a file descriptor in Linux parlance since all sockets are
abstracted using file descriptors). Afterwards this system call is invoked, which
would block until at least on of the registered file descriptors become ready.
Once this is true the file descriptors ready for doing I/O will be fetched as the
return of the system call and can be serviced sequentially in a loop in the
application thread.

The ready connection processing logic is usually contained
within a user provided event handler which would still have to issue non blocking
read()/write() calls to fetch data from device to kernel and ultimately to the
user space buffer incurring a context switch to the kernel. More ever there is
usually no absolute guarantee that it will be possible to do the intended I/O
with the device since what operating system provides is only an indication that
the device might be ready to do the I/O operation of interest but the non blocking
read() or write() can bail you out in such situations. However this should be
the exception than the norm.

So the overall idea is to get readiness events in an asynchronous fashion and
register some event handlers to handle once such event notifications are
triggered. So as you can see all of these can be done in a single thread while
multiplexing among different connections primarily due to the nature of the
select() (here I choose a representative system call) which can return readiness
of multiple sockets at a time. This is part of the appeal of this mode of
operation where one thread can serve large number of connections at a time. This
mode is what usually known as the “Non Blocking I/O” model.

Java has abstracted out the differences between platform specific system call
implementations with its NIO API. The socket/file descriptors are abstracted
using Channels and Selector encapsulates the selection system call. The
applications interested in getting readiness events registers a Channel (usually
a SocketChannel obtained by an accept() on a ServerSocketChannel) with the
Selector and get a SelectionKey which acts as a handle for holding the Channel
and registration information. Then the blocking select() call is made on
Selector which would return a set of SelectionKeys which then can be processed
one by one using the application specified event handlers.


Selector selector = Selector.open();

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

while(true) {

  int readyChannels = selector.select();

  if(readyChannels == 0) continue;

  Set<SelectionKey> selectedKeys = selector.selectedKeys();

  Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

  while(keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
  }
}

Simple non blocking server

Asynchronous and Non Blocking I/O with Completion Events

Readiness events only go so far to notify you that the device/ socket is ready do
something. The application still has to do the dirty work of reading the data
from the device/ socket (more accurately directing the operating system to do so
via a system call) to the user space buffer all the way from device. Wouldn’t it
be nice to delegate this job to the operating system to run in the background
and let it inform you once it’s completed the job by transferring all the data
from device to kernel buffer and finally to the application level buffer?
That is the basic idea behind this mode usually known as the “Asynchronous I/O”
mode. For this it is required the operating system support AIO operations.
In Linux this support is present in aio POSIX API from 2.6 and for Windows this
is present in the form of “I/O Completion Ports”.

With NIO2 Java has stepped up its support for this mode with its
AsynchronousChannel API.

Operating System Support

In order to support readiness and completion event notifications different
operating systems provide varying system calls. For readiness events select()
and poll() can be used in Linux based systems. However the newer epoll() variant
is preferred due to its efficiency over select() or poll(). select() suffer from
the fact that the selection time increases linearly with the number of
descriptors monitored. It is appearently notorious for overwriting the file
descriptor array references. So each time it is called the descriptor array is
required to be repopulated from a separate copy. Not an elegant solution at any rate.

The epoll() variant can be configured in two ways. Namely edge-triggered and
level-triggered. In edge-triggered case it will emit a notification only when
an event is detected on the associated descriptor. Say during an
event-triggered notification your application handler only read half of the
kernel input buffer. Now it won’t get a notification on this descriptor next
time around even when there are some data to be read unless the device is ready
to send more data causing a file descriptor event. Level-triggered
configuration on the other hand will trigger a notification each time when there
is data to be read.

The comparable system calls are present in the form of kqueue in BSD flavours
and /dev/poll or “Event Completion” in Solaris depending on the version. The
Windows equivalent is “I/O Completion Ports”.

The situation for the AIO mode however is bit different at least in the Linux
case. The aio support for sockets in Linux seems to be shady at best with some
suggesting it is actually using readiness events at kernel level while providing
an asynchronous abstraction on completion events at application level. However
Windows seems to support this first class again via “I/O Completion Ports”.

Design I/O Patterns 101

There are patterns every where when it comes to software development. I/O is no
different. There are couple I/O patterns associated with NIO and AIO models
which are described below.

Reactor Pattern

There are several components participating in this pattern. I will go through
them first so it would be easy to understand the diagram.

Reactor Initiator: This is the component which would initiate the non blocking
server by configuring and initiating the dispatcher. First it would bind the
server socket and register it with the demultiplexer for client connection accept
readiness events. Then the event handler implementations for each type of
readiness events (read/ write/ accept etc..) will be registered with the
dispatcher. Next the dispatcher event loop will be invoked to handle event
notifications.

Dispatcher : Defines an interface for registering, removing, and dispatching
Event Handlers responsible for reacting on connection events which include
connection acceptance, data input/output and timeout events on a set of
connections. For servicing a client connection the related event handler (e.g:
accept event handler) would register the accepted client channel (wrapper for
underlying client socket) with the demultiplexer along side with the type of
readiness events to listen for that particular channel. Afterwards the
dispatcher thread will invoke the blocking readiness selection operation on
demultiplexer for the set of registered channels. Once one or more registered
channels are ready for I/O the dispatcher would service each returned “Handle”
associated with the each ready channel one by one using registered event
handlers. It is important that these event handlers don’t hold up dispatcher
thread since it will delay dispatcher servicing other ready connections. Since
the usual logic within an event handler includes transferring data to/from the
ready connection which would block until all the data are transferred between
user space and kernel space data buffers normally it is the case that these
handlers are run in different threads from a thread pool.

Handle : A handle is returned once a channel is registered with the
demultiplexer which encapsulates connection channel and readiness information.
A set of ready Handles would be returned by demultiplexer readiness selection
operation. Java NIO equivalent is SelectionKey.

Demultiplexer : Waits for readiness events of in one or more registered
connection channels. Java NIO equivalent is Selector.

Event Handler : Specifies the interface having hook methods for dispatching
connection events. These methods need to be implemented by application specific
event handler implementations.

Concrete Event Handler : Contains the logic to read/write data from underlying
connection and to do the required processing or initiate client connection
acceptance protocol from the passed Handle.

Event handlers are typically run in separate threads from a thread pool as shown in below diagram.


A simple echo server implementation for this pattern is as follows (without event handler thread pool).


public class ReactorInitiator {

private static final int NIO_SERVER_PORT = 9993;

  public void initiateReactiveServer(int port) throws Exception {

    ServerSocketChannel server = ServerSocketChannel.open();
    server.socket().bind(new InetSocketAddress(port));
    server.configureBlocking(false);

    Dispatcher dispatcher = new Dispatcher();
    dispatcher.registerChannel(SelectionKey.OP_ACCEPT, server);

    dispatcher.registerEventHandler(
      SelectionKey.OP_ACCEPT, new AcceptEventHandler(
      dispatcher.getDemultiplexer()));

    dispatcher.registerEventHandler(
      SelectionKey.OP_READ, new ReadEventHandler(
      dispatcher.getDemultiplexer()));

    dispatcher.registerEventHandler(
    SelectionKey.OP_WRITE, new WriteEventHandler());

    dispatcher.run(); // Run the dispatcher loop

 }

  public static void main(String[] args) throws Exception {
    System.out.println("Starting NIO server at port : " +
      NIO_SERVER_PORT);
    new ReactorInitiator().
      initiateReactiveServer(NIO_SERVER_PORT);
  }

}

public class Dispatcher {

  private Map<Integer, EventHandler> registeredHandlers =
    new ConcurrentHashMap<Integer, EventHandler>();
  private Selector demultiplexer;

  public Dispatcher() throws Exception {
    demultiplexer = Selector.open();
  }

  public Selector getDemultiplexer() {
    return demultiplexer;
  }

  public void registerEventHandler(
    int eventType, EventHandler eventHandler) {
    registeredHandlers.put(eventType, eventHandler);
  }

  // Used to register ServerSocketChannel with the
  // selector to accept incoming client connections
  public void registerChannel(
    int eventType, SelectableChannel channel) throws Exception {
    channel.register(demultiplexer, eventType);
  }

  public void run() {
    try {
      while (true) { // Loop indefinitely
        demultiplexer.select();

        Set<SelectionKey> readyHandles =
          demultiplexer.selectedKeys();
        Iterator<SelectionKey> handleIterator =
          readyHandles.iterator();

        while (handleIterator.hasNext()) {
          SelectionKey handle = handleIterator.next();

          if (handle.isAcceptable()) {
            EventHandler handler =
              registeredHandlers.get(SelectionKey.OP_ACCEPT);
              handler.handleEvent(handle);
           // Note : Here we don't remove this handle from
           // selector since we want to keep listening to
           // new client connections
          }

          if (handle.isReadable()) {
            EventHandler handler =
              registeredHandlers.get(SelectionKey.OP_READ);
            handler.handleEvent(handle);
            handleIterator.remove();
          }

          if (handle.isWritable()) {
            EventHandler handler =
              registeredHandlers.get(SelectionKey.OP_WRITE);
            handler.handleEvent(handle);
            handleIterator.remove();
          }
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}

public interface EventHandler {

   public void handleEvent(SelectionKey handle) throws Exception;

}

public class AcceptEventHandler implements EventHandler {
  private Selector demultiplexer;
  public AcceptEventHandler(Selector demultiplexer) {
    this.demultiplexer = demultiplexer;
  }

  @Override
  public void handleEvent(SelectionKey handle) throws Exception {
    ServerSocketChannel serverSocketChannel =
     (ServerSocketChannel) handle.channel();
    SocketChannel socketChannel = serverSocketChannel.accept();
    if (socketChannel != null) {
      socketChannel.configureBlocking(false);
      socketChannel.register(
        demultiplexer, SelectionKey.OP_READ);
    }
  }

}

public class ReadEventHandler implements EventHandler {

  private Selector demultiplexer;
  private ByteBuffer inputBuffer = ByteBuffer.allocate(2048);

  public ReadEventHandler(Selector demultiplexer) {
    this.demultiplexer = demultiplexer;
  }

  @Override
  public void handleEvent(SelectionKey handle) throws Exception {
    SocketChannel socketChannel =
     (SocketChannel) handle.channel();

    socketChannel.read(inputBuffer); // Read data from client

    inputBuffer.flip();
    // Rewind the buffer to start reading from the beginning

    byte[] buffer = new byte[inputBuffer.limit()];
    inputBuffer.get(buffer);

    System.out.println("Received message from client : " +
      new String(buffer));
    inputBuffer.flip();
    // Rewind the buffer to start reading from the beginning
    // Register the interest for writable readiness event for
    // this channel in order to echo back the message

    socketChannel.register(
      demultiplexer, SelectionKey.OP_WRITE, inputBuffer);
  }

}

public class WriteEventHandler implements EventHandler {

  @Override
  public void handleEvent(SelectionKey handle) throws Exception {
    SocketChannel socketChannel =
      (SocketChannel) handle.channel();
    ByteBuffer inputBuffer = (ByteBuffer) handle.attachment();
    socketChannel.write(inputBuffer);
    socketChannel.close(); // Close connection
  }

}

Proactor Pattern

This pattern is based on asynchronous I/O model. Main components are as follows.

Proactive Initiator : This is the entity which initiates Asynchronous Operation
accepting client connections. This is usually the server application’s main
thread. Registers a Completion Handler along with a Completion Dispatcher to
handle connection acceptance asynchronous event notification.

Asynchronous Operation Processor : This is responsible for carrying out I/O
operations asynchronously and providing completion event notifications to
application level Completion Handler. This is usually the asynchronous I/O
interface exposed by Operating System.

Asynchronous Operation : Asynchronous Operations are run to completion
by the Asynchronous Operation Processor in separate kernel threads.

Completion Dispatcher : This is responsible for calling back to the application
Completion Handlers when Asynchronous Operations complete. When the Asynchronous
Operation Processor completes an asynchronously initiated operation, the
Completion Dispatcher performs an application callback on its behalf. Usually
delegates the event notification handling to the suitable Completion Handler
according to the type of the event.

Completion Handler : This is the interface implemented by application to process
asynchronous event completion events.

Let’s look at how this pattern can be implemented (as a simple echo server) using new Java NIO.2 API added in Java 7.


public class ProactorInitiator {
  static int ASYNC_SERVER_PORT = 4333;

  public void initiateProactiveServer(int port)
    throws IOException {

    final AsynchronousServerSocketChannel listener =
      AsynchronousServerSocketChannel.open().bind(
        new InetSocketAddress(port));
     AcceptCompletionHandler acceptCompletionHandler =
       new AcceptCompletionHandler(listener);

     SessionState state = new SessionState();
     listener.accept(state, acceptCompletionHandler);
  }

  public static void main(String[] args) {
    try {
       System.out.println("Async server listening on port : " +
         ASYNC_SERVER_PORT);
       new ProactorInitiator().initiateProactiveServer(
         ASYNC_SERVER_PORT);
    } catch (IOException e) {
     e.printStackTrace();
    }

    // Sleep indefinitely since otherwise the JVM would terminate
    while (true) {
      try {
        Thread.sleep(Long.MAX_VALUE);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

public class AcceptCompletionHandler
  implements
    CompletionHandler<AsynchronousSocketChannel, SessionState> {

  private AsynchronousServerSocketChannel listener;

  public AcceptCompletionHandler(
    AsynchronousServerSocketChannel listener) {
    this.listener = listener;
  }

  @Override
  public void completed(AsynchronousSocketChannel socketChannel,
    SessionState sessionState) {
   // accept the next connection
   SessionState newSessionState = new SessionState();
   listener.accept(newSessionState, this);

   // handle this connection
   ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
   ReadCompletionHandler readCompletionHandler =
     new ReadCompletionHandler(socketChannel, inputBuffer);
   socketChannel.read(
     inputBuffer, sessionState, readCompletionHandler);
  }

  @Override
  public void failed(Throwable exc, SessionState sessionState) {
   // Handle connection failure...
  }

}

public class ReadCompletionHandler implements
  CompletionHandler<Integer, SessionState> {

   private AsynchronousSocketChannel socketChannel;
   private ByteBuffer inputBuffer;

   public ReadCompletionHandler(
     AsynchronousSocketChannel socketChannel,
     ByteBuffer inputBuffer) {
     this.socketChannel = socketChannel;
     this.inputBuffer = inputBuffer;
   }

   @Override
   public void completed(
     Integer bytesRead, SessionState sessionState) {

     byte[] buffer = new byte[bytesRead];
     inputBuffer.rewind();
     // Rewind the input buffer to read from the beginning

     inputBuffer.get(buffer);
     String message = new String(buffer);

     System.out.println("Received message from client : " +
       message);

     // Echo the message back to client
     WriteCompletionHandler writeCompletionHandler =
       new WriteCompletionHandler(socketChannel);

     ByteBuffer outputBuffer = ByteBuffer.wrap(buffer);

     socketChannel.write(
       outputBuffer, sessionState, writeCompletionHandler);
  }

  @Override
  public void failed(Throwable exc, SessionState attachment) {
    //Handle read failure.....
   }

}

public class WriteCompletionHandler implements
  CompletionHandler<Integer, SessionState> {

  private AsynchronousSocketChannel socketChannel;

  public WriteCompletionHandler(
    AsynchronousSocketChannel socketChannel) {
    this.socketChannel = socketChannel;
  }

  @Override
  public void completed(
    Integer bytesWritten, SessionState attachment) {
    try {
      socketChannel.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void failed(Throwable exc, SessionState attachment) {
   // Handle write failure.....
  }

}

public class SessionState {

  private Map<String, String> sessionProps =
    new ConcurrentHashMap<String, String>();

   public String getProperty(String key) {
     return sessionProps.get(key);
   }

   public void setProperty(String key, String value) {
     sessionProps.put(key, value);
   }

}

Each type of event completion (accept/ read/ write) is handled by a separate
completion handler implementing CompletionHandler interface (Accept/ Read/
WriteCompletionHandler etc.). The state transitions are managed inside these
connection handlers. Additional SessionState argument can be used to
hold client session specific state across a series of completion events.

NIO Frameworks (HTTPCore)

If you are thinking of implementing a NIO based HTTP server you are in luck.
Apache HTTPCore library provides excellent support for handling HTTP traffic
with NIO. API provides higher level abstractions on top of NIO layer with HTTP
requests handling built in. A minimal non blocking HTTP server implementation
which returns a dummy output for any GET request is given below.

public class NHttpServer {

  public void start() throws IOReactorException {
    HttpParams params = new BasicHttpParams();
    // Connection parameters
    params.
      setIntParameter(
        HttpConnectionParams.SO_TIMEOUT, 60000)
     .setIntParameter(
       HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
     .setBooleanParameter(
       HttpConnectionParams.STALE_CONNECTION_CHECK, true)
     .setBooleanParameter(
       HttpConnectionParams.TCP_NODELAY, true);

    final DefaultListeningIOReactor ioReactor =
      new DefaultListeningIOReactor(2, params);
    // Spawns an IOReactor having two reactor threads
    // running selectors. Number of threads here is
    // usually matched to the number of processor cores
    // in the system

    // Application specific readiness event handler
    ServerHandler handler = new ServerHandler();

    final IOEventDispatch ioEventDispatch =
      new DefaultServerIOEventDispatch(handler, params);
    // Default IO event dispatcher encapsulating the
    // event handler

    ListenerEndpoint endpoint = ioReactor.listen(
      new InetSocketAddress(4444));

    // start the IO reactor in a new separate thread
    Thread t = new Thread(new Runnable() {
      public void run() {
        try {
          System.out.println("Listening in port 4444");
          ioReactor.execute(ioEventDispatch);
        } catch (InterruptedIOException ex) {
          ex.printStackTrace();
        } catch (IOException e) {
          e.printStackTrace();
        } catch (Exception e) {
          e.printStackTrace();
        }
    }
    });
    t.start();

    // Wait for the endpoint to become ready,
    // i.e. for the listener to start accepting requests.
    try {
      endpoint.waitFor();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args)
    throws IOReactorException {
    new NHttpServer().start();
  }

}

public class ServerHandler implements NHttpServiceHandler {

 private static final int BUFFER_SIZE = 2048;

 private static final String RESPONSE_SOURCE_BUFFER =
 "response-source-buffer";

 // the factory to create HTTP responses
 private final HttpResponseFactory responseFactory;

 // the HTTP response processor
 private final HttpProcessor httpProcessor;

 // the strategy to re-use connections
 private final ConnectionReuseStrategy connStrategy;

 // the buffer allocator
 private final ByteBufferAllocator allocator;

 public ServerHandler() {
   super();
   this.responseFactory = new DefaultHttpResponseFactory();
   this.httpProcessor = new BasicHttpProcessor();
   this.connStrategy = new DefaultConnectionReuseStrategy();
   this.allocator = new HeapByteBufferAllocator();
 }

 @Override
 public void connected(
   NHttpServerConnection nHttpServerConnection) {
   System.out.println("New incoming connection");
 }

 @Override
 public void requestReceived(
   NHttpServerConnection nHttpServerConnection) {

   HttpRequest request =
     nHttpServerConnection.getHttpRequest();
   if (request instanceof HttpEntityEnclosingRequest) {
     // Handle POST and PUT requests
   } else {

     ContentOutputBuffer outputBuffer =
       new SharedOutputBuffer(
         BUFFER_SIZE, nHttpServerConnection, allocator);

     HttpContext context =
       nHttpServerConnection.getContext();
     context.setAttribute(
       RESPONSE_SOURCE_BUFFER, outputBuffer);
     OutputStream os =
       new ContentOutputStream(outputBuffer);

     // create the default response to this request
     ProtocolVersion httpVersion =
     request.getRequestLine().getProtocolVersion();
     HttpResponse response =
       responseFactory.newHttpResponse(
         httpVersion, HttpStatus.SC_OK,
         nHttpServerConnection.getContext());

     // create a basic HttpEntity using the source
     // channel of the response pipe
     BasicHttpEntity entity = new BasicHttpEntity();
     if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) {
       entity.setChunked(true);
     }
     response.setEntity(entity);

     String method = request.getRequestLine().
       getMethod().toUpperCase();

     if (method.equals("GET")) {
       try {
         nHttpServerConnection.suspendInput();
         nHttpServerConnection.submitResponse(response);
         os.write(new String("Hello client..").
           getBytes("UTF-8"));

         os.flush();
         os.close();
     } catch (Exception e) {
       e.printStackTrace();
     }
    } // Handle other http methods
   }
 }

 @Override
 public void inputReady(
    NHttpServerConnection nHttpServerConnection,
    ContentDecoder contentDecoder) {
    // Handle request enclosed entities here by reading
    // them from the channel
 }

 @Override
 public void responseReady(
    NHttpServerConnection nHttpServerConnection) {

   try {
     nHttpServerConnection.close();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }

 @Override
 public void outputReady(
   NHttpServerConnection nHttpServerConnection,
   ContentEncoder encoder) {
   HttpContext context = nHttpServerConnection.getContext();
   ContentOutputBuffer outBuf =
    (ContentOutputBuffer) context.getAttribute(
      RESPONSE_SOURCE_BUFFER);

   try {
     outBuf.produceContent(encoder);
   } catch (IOException e) {
     e.printStackTrace();
   }
 }

 @Override
 public void exception(
   NHttpServerConnection nHttpServerConnection,
   IOException e) {
   e.printStackTrace();
 }

 @Override
 public void exception(
   NHttpServerConnection nHttpServerConnection,
   HttpException e) {
   e.printStackTrace();
 }

 @Override
 public void timeout(
   NHttpServerConnection nHttpServerConnection) {
   try {
     nHttpServerConnection.close();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }

 @Override
 public void closed(
   NHttpServerConnection nHttpServerConnection) {
   try {
     nHttpServerConnection.close();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }

}

IOReactor class will basically wrap the demultiplexer functionality with
ServerHandler implementation handling readiness events.

Apache Synapse (an open source ESB) contains a good implementation of a
NIO based HTTP server in which NIO is used to scaling for a large number
of clients per instance with rather constant memory usage over time.
The implementation also contains good debugging and server statistics
collection mechanisms built in along with Axis2 transport framework integration.
It can be found at [1].

Conclusion

There are several options when it comes to doing I/O which can affect the
scalability and performance of servers. Each of above I/O mechanisms have pros and
cons so that the decisions should be made on expected scalability and performance
characteristics and ease of maintainence of these approaches. This concludes my
somewhat long winded article on I/O. Feel free to provide suggestions,
corrections or comments that you may have. Complete source codes for servers
outlined in the post along with clients can be downloaded from here.

 

References

There were many references I went through in the process. Below are some of the interesting ones.

[1] http://www.ibm.com/developerworks/java/library/j-nio2-1/index.html

[2] http://www.ibm.com/developerworks/linux/library/l-async/

[3] http://lse.sourceforge.net/io/aionotes.txt

[4] http://wknight8111.blogspot.com/search/label/AIO

[5] http://nick-black.com/dankwiki/index.php/Fast_UNIX_Servers

[6] http://today.java.net/pub/a/today/2007/02/13/architecture-of-highly-scalable-nio-server.html

[7] Java NIO by Ron Hitchens

[8] http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

[9] http://www.cs.wustl.edu/~schmidt/PDF/proactor.pdf

[10] http://www.kegel.com/c10k.html

My review on Hadoop : The Definitive Guide 3rd ed. is now online at DZone. Here is the “One Minute Bottom Line” of the review.

Overall the writing style is accessible though bit verbose at times. The logical progression of content is ok for the most part though some sections are deep inside chapters making it bit hard to find (e.g: A good example would be the section on Cascading). Also I think the section on writing a mapreduce application is bit too deep into the book. However with those aside I think the book is a good reference, capturing new developments of Hadoop and its ecosystem of projects sufficiently across its breadth.

Thanks Mitch for giving me the opportunity to review the book. You can read the full story from here.

 

JMX (Java Management Extensions) is a J2SE technology which enables management
and monitoring of Java applications. The basic idea is to implement a set of
management objects and register the implementations to a platform server
from where these implementations can be invoked either locally or remotely to
the JVM using a set of connectors or adapters.

A management/instrumentation object is called an MBean (stands for Managed Bean). Once
instantiated a MBean will be registered with a unique ObjectName with the
platform MBeanServer. MBeanServer acts as a repository of MBeans enabling the
creation, registering, accessing and removing of the MBeans. However MBeanServer
does not persist the MBean information. So with a restart of the JVM you would
loose all the MBeans in it. The MBeanServer is normally accessed through its
MBeanServerConnection API which works both locally and remotely.
The management interface of an MBean would typically
consist of [1]

  • Named and typed attributes that can be read/ written
  • Named and typed operations that can be invoked
  • Typed notifications that can be emitted by the MBean

For example say it is required to manage a thread pool parameters of one of your
applications at runtime. With JMX it’s a matter of writing a MBean with logic
related to setting and getting these parameters and registering it to the
MBeanServer.

Now the next step is to expose these mbeans to the outside world so that remote
clients can invoke these MBeans to manage your application. It can be done via
various protocols implemented via protocol connectors and protocol adapters.
A protocol connector basically expose MBeans as they are so that remote client
sees the same interface (JMX RMI Connector is a good example). So basically the
client or the remote management application should be enabled for JMX
technology.

A protocol adapter (e.g: HTML, SNMP) adapt the results according to the protocol
the client is expecting (e.g: for a browser-based client sends the results in
HTML over HTTP).

Now that MBeans are properly exposed to the outside we need some clients to
access these MBeans to manage our applications. There are basically two
categories of clients available according to whether they use connectors or
adapters.

JMX Clients use JMX APIs to connect to MBeanServer and invoke MBeans. Generally
JMX Clients use a MBeanServerConnection to connect to the MBeanServer and invoke
MBeans through it by providing the MBean ID (ObjectName) and required
parameters. There are basically three types of JMX Clients.

Local JMX Client : A client that runs in the same JVM as the MBeanServer.
These clients can also use MBeanServer API itself since they are running inside
the same JVM.

Agent : The agent is a local JMX Client which manages the MBeanServer itself.
Remember that MBeanServer does not persist MBean information. So we can use
an Agent to provide this logic which would encapsulate the MBeanServer with
the additional functionality. So the Agent is responsible for initializing
and managing the MBeanServer itself.

Remote JMX Client : Remote client is only different from that of a local

client in that it needs to instantiate a Connector for connecting to a Connector
server in order to get a MBeanServerConnection. And of course they would be
running in a remote JVM as the name suggests.

Next type of client is the Management Clients which use protocol adapters to
connect to MBeanServer. For these to work the respective adapter should be
present and running in the JVM being managed. For example HTML adapter should be
present in the JVM for a browser-based client to connect to it invoke MBeans.
The diagram below summarizes the concepts described so far.

 

 

This concludes my quick notes on JMX. An extremely good read on main JMX
concepts can be found at [2]. Also JMX learning trail at Oracle is a good
starting point for getting good with JMX.
[1] http://docs.oracle.com/javase/6/docs/technotes/guides/jmx/overview/instrumentation.html#wp998816
[2] http://pub.admc.com/howtos/jmx/architecture-chapt.html

This is my log on several mistakes (some pretty dumb on the hindsight :)) that I did while getting started with Hadoop and Hive some time back, along with some tricks on debugging Hadoop and Hive. I am using Hadoop 0.20.203 and Hive 0.8.1.

localhost: Error: JAVA_HOME is not set

This almost undecipherable and cryptic error message 🙂 during Hadoop startup (namenode/jobtracker etc.) says Hadoop cannot find the Java installation. Wait!! I have already set JAVA_HOME enviornment variable?? Seems it’s not enough. So where else to set it? Turns out that you have to set JAVA_HOME in hadoop-env.sh present in conf folder to get the elephant moving.

Name node mysteriously fails to start

When you start the namenode things seems fine except for the fact that the server is not up and running. And of course I hadn’t formatted the HDFS on the namenode. So why should it work right? 🙂 So there goes. Format the darn namenode before doing anything distributed with Hadoop.


bin/hadoop namenode -format

java.io.IOException Call to localhost/127.0.0.1:9000 failed on local exception: java.io.EOFException

This one was bit tricky. After fiddling and struggling for some time found out that Hadoop dependency version used in the JobClient in order to communicate with JobTracker is different from the version that’s present inside the running Hadoop instance. Hadoop uses a homegrown RPC mechanism to communicate with job tracker and name nodes. And it seems certain different Hadoop versions have incompatibilities in this interface.

Now it’s time for some debugging tips.

Debugging Hadoop Local (Standalone) mode

Add debugging options for JVM as follows in conf/hadoop-env.sh.


export HADOOP_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=[DEBUG_PORT]"

Debugging Hive Server

Start Hive with following command line to remote debug Hive.


./hive --service hiveserver --debug[port=[DEBUG_PORT],mainSuspend=y,childSuspend=y]

Recently I worked on getting Apache Hive work inside an OSGi environment. While
not proving to be a proverbial piece of cake (software right?.. Why am I not
surprised? :)), it led me through an assortment of Java and OSGi errors. Here I
am listing some of them that bit me bit hard (no pun intended) so that I
thought of making a blog out them just for my own satisfaction.

java.lang.VerifyError

I got this nastiness during initialization of one of OSGi service components.
The culprit was not immediately identifiable since the offending bundle was in
ACTIVE state. On the surface everything looked fine except for the fact the
Hive server which was supposed to start during the initialization of the
service component present in the bundle was not up and running. A quick ‘ls’ in
the OSGi console revealed the service component is in ‘unsatisfied’ state.
Finally a ‘comp’ revealed the root cause, the VerifyError.

The VerifyError can occur if the runtime dependency of a class is different to that
of the dependency that was used at compilation time. For example if the method
signatures have changed between the dependencies then this error would result.
This is nicely explained at [1] in the accepted answer. As it turned out
slightly different versions of a package had been exposed in two bundles causing
the Hive bundle to pick up a different version over the version that was in the
compilation environment. Proper OSGi versioning turned out to be the solution.

java.lang.IncompatibleClassChangeError

This error also cropped up under a similar circumstance where two packages were
present in the system. As [2] clearly explains, the reason for this in my case
was an interface being changed to an abstract class between the conflicting
package versions. Again the versioning helped to save the day.

java.lang.LinkageError : loader constraint violation in xxxx – blah …

Now this seems to be a famous error specially in OSGi enviornments. Main root
cause seems to be two classes loaded by different ClassLoaders coming in to
contact in a certain way. For example say Class A object accept a Class B object
as a method parameter. Class B is loaded by ClassLoader-A which also loads Class
A. But at the method invocation time how ever an object of Class B which has
been loaded by ClassLoader-B is passed as an argument to an object of Class A
which has been loaded by ClassLoader-A. Now the result would be a big fat
LinkageError with a very verbose error message.

The graph based class loadingstructure in OSGi makes it specially conducive to these kind of errors. In my case the culprit was a package which had been duplicated in two different
bundles and a particular class in that package loaded by the separate
ClassLoaders of each of the bundles coming in to contact via a third bundle
present in the system during a method call. So this was a case of not following
“import what you export” best practice [3] in OSGi. Doing so would help to
reduce the exposure of duplicated packages across bundles and help to maintain a
consistent class space for a given package. And so this turned out to be the
resolution for that in this case.

Package uses conflict: Import-Package: yyy; version=”x.x.x”

I had my fair share of this inconvenience thrown at my face every so often
during the exercise. There are two excellent posts [4],[5] exactly on this issue
at SpringSource which helped a lot. However let me summarize my learning on this
issue. Simply if a bundle is being exposed to two versions of the same package
through a direct import and via a uses constraint this error would come up. The
diagram best illustrates this situation.

The bundle A imports org.foo version 1.0.0 directly. However it also imports
bundle org.bar from bundle B. However as it turns out package org.bar also uses
org.foo package albeit it’s a different version (2.0.0) than that of the version
imported by bundle A. Now bundle A is directly wired to version 1.0.0 of org.foo
and also being exposed to the version 2.0.0 of org.foo due to the
import of org.bar which is using version 2.0.0 of org.foo. Now since a bundle
cannot be wired to different versions of the same package, a uses conflict would
come up with offending import org.bar as the root cause. (e.g: Package uses
conflict: Import-Package: org.bar; version=”0.0.0″). The solution would be to
change package import versions of org.bar in either bundle A or bundle B so that
both would be pointing to the same package version. Another excellent blog by
Neil Bartlett on this can be found at [6].

java.lang.UnsatisfiedLinkError

One of my friends at work came across this while trying to incorporate another
third party library in to our OSGi enviornment. JavaDocs goes on to say that
this gets “Thrown if the Java Virtual Machine cannot find an appropriate
native-language definition of a method declared native”. The offending library
was a linux .so (dynamically linked library) file which was not visible to
bundle ClassLoader at runtime. We were able to get it working by directly
including the library resource to the bundle ClassLoader. An earlier attempt on
setting this resource on TCCL (Thread Context ClassLoader) failed and this let
us to the realization that the TCCL is typically not the bundle class loader. A
good reading on TCCL under Equinox OSGi enviornment can be found at [7].

 

[1] http://stackoverflow.com/questions/100107/reasons-of-getting-a-java-lang-verifyerror
[2] http://stackoverflow.com/questions/1980452/what-causes-java-lang-incompatibleclasschangeerror
[3] http://blog.osgi.org/2007/04/importance-of-exporting-nd-importing.html
[4] http://blog.springsource.org/2008/10/20/understanding-the-osgi-uses-directive/
[5] http://blog.springsource.org/2008/11/22/diagnosing-osgi-uses-conflicts/

[6] http://njbartlett.name/2011/02/09/uses-constraints.html
[7] http://wiki.eclipse.org/ContextClassLoader_Enhancements

Recently I wanted to set up a remote desktop sharing session from home pc to my laptop. While going through the set up guide I came across ssh tunneling. Even though there are many articles on the subject still it took me a considerable amount of googling, some experimenting and couple of Wireshark sessions to grasp what’s going under the hood. Most of the guides were incomplete in terms of explaining the concept which left me desiring for a good article on the subject with some explanatory illustrations. So I decided to write it my self. So here goes…

Introduction

A SSH tunnel consists of an encrypted tunnel created through a SSH protocol
connection. A SSH tunnel can be used to transfer unencrypted traffic over a
network through an encrypted channel. For example we can use a ssh tunnel to
securely transfer files between a FTP server and a client even though the FTP
protocol itself is not encrypted. SSH tunnels also provide a means to bypass firewalls that prohibits or filter certain internet services. For example an organization will block certain sites using their proxy filter. But users may not wish to have their web traffic
monitored or blocked by the organization proxy filter. If users can connect to
an external SSH server, they can create a SSH tunnel to forward a given port on
their local machine to port 80 on remote web-server via the external SSH
server. I will describe this scenario in detail in a little while.

To set up a SSH tunnel a given port of one machine needs to be forwarded (of
which I am going to talk about in a little while) to a port in the other
machine which will be the other end of the tunnel. Once the SSH tunnel has been
established, the user can connect to earlier specified port at first machine to
access the network service.

Port Forwarding

SSH tunnels can be created in several ways using different kinds of port forwarding
mechanisms. Ports can be forwarded in three ways.

  1. Local port forwarding
  2. Remote port forwarding
  3. Dynamic port forwarding

I didn’t explain what port forwarding is. I found Wikipedia’s definition more explanatory.

Port forwarding or port mapping is a name given to the combined technique of

  1. translating the address and/or port number of a packet to a new destination
  2. possibly accepting such packet(s) in a packet filter(firewall)
  3. forwarding the packet according to the routing table.

Here the first technique will be used in creating an SSH tunnel. When a client application connects to the local port (local endpoint) of the SSH tunnel and transfer data these data will be forwarded to the remote end by translating the host and port values to that of the remote end of the channel.

So with that let’s see how SSH tunnels can be created using forwarded ports with an examples.

Tunnelling with Local port forwarding

Let’s say that yahoo.com is being blocked using a proxy filter in the University.
(For the sake of this example. :). Cannot think any valid reason why yahoo would be blocked). A SSH tunnel can be used to bypass this restriction. Let’s name my machine at the university as ‘work’ and my home machine as ‘home’. ‘home’ needs to have a public IP for this to work. And I am running a SSH server on my home machine. Following diagram illustrates the scenario.

To create the SSH tunnel execute following from ‘work’ machine.

ssh -L 9001:yahoo.com:80 home

The ‘L’ switch indicates that a local port forward is need to be created. The switch syntax is as follows.

-L <local-port-to-listen>:<remote-host>:<remote-port>

Now the SSH client at ‘work’ will connect to SSH server running at ‘home’ (usually running at port 22) binding port 9001 of ‘work’ to listen for local requests thus creating a SSH tunnel between ‘home’ and ‘work’. At the ‘home’ end it will create a connection to ‘yahoo.com’ at port 80. So ‘work’ doesn’t need to know how to connect to yahoo.com. Only ‘home’ needs to worry about that. The channel between ‘work’ and ‘home’ will be encrypted while the connection between ‘home’ and ‘yahoo.com’ will be unencrypted.

Now it is possible to browse yahoo.com by visiting http://localhost:9001 in the web browser at ‘work’ computer. The ‘home’ computer will act as a gateway which would accept requests from ‘work’ machine and fetch data and tunnelling it back. So the syntax of the full command would be as follows.

ssh -L <local-port-to-listen>:<remote-host>:<remote-port> <gateway>

The image below describes the scenario.

Here the ‘host’ to ‘yahoo.com’ connection is only made when browser makes the
request not at the tunnel setup time.

It is also possible to specify a port in the ‘home’ computer itself instead of
connecting to an external host. This is useful if I were to set up a VNC session
between ‘work’ and ‘home’. Then the command line would be as follows.

ssh -L 5900:localhost:5900 home (Executed from 'work')

So here what does localhost refer to? Is it the ‘work’ since the command line is executed from ‘work’? Turns out that it is not. As explained earlier is relative to the gateway (‘home’ in this case) , not the machine from where the tunnel is initiated. So this will make a connection to port 5900 of the ‘home’ computer where the VNC client would be listening in.

The created tunnel can be used to transfer all kinds of data not limited to web browsing sessions. We can also tunnel SSH sessions from this as well. Let’s assume there is another computer (‘banned’) to which we need to SSH from within University but the SSH access is being blocked. It is possible to tunnel a SSH session to this host using a local port forward. The setup would look like this.

As can be seen now the transferred data between ‘work’ and ‘banned’ are encrypted end to end. For this we need to create a local port forward as follows.

ssh -L 9001:banned:22 home

Now we need to create a SSH session to local port 9001 from where the session
will get tunneled to ‘banned’ via ‘home’ computer.

ssh -p 9001 localhost

With that let’s move on to next type of SSH tunnelling method, reverse tunnelling.

Reverse Tunnelling with remote port forwarding

Let’s say it is required to connect to an internal university website from home.
The university firewall is blocking all incoming traffic. How can we connect from ‘home’ to internal network so that we can browse the internal site? A VPN setup is a good candidate here. However for this example let’s assume we don’t have this facility. Enter SSH reverse tunnelling..

As in the earlier case we will initiate the tunnel from ‘work’ computer behind the firewall. This is possible since only incoming traffic is blocking and outgoing traffic is allowed. However instead of the earlier case the client will now be at the ‘home’ computer. Instead of -L option we now define -R which specifies
a reverse tunnel need to be created.

ssh -R 9001:intra-site.com:80 home (Executed from 'work')

Once executed the SSH client at ‘work’ will connect to SSH server running at home creating a SSH channel. Then the server will bind port 9001 on ‘home’ machine to listen for incoming requests which would subsequently be routed through the created SSH channel between ‘home’ and ‘work’. Now it’s possible to browse the internal site
by visiting http://localhost:9001 in ‘home’ web browser. The ‘work’ will then create a connection to intra-site and relay back the response to ‘home’ via the created SSH channel.

As nice all of these would be still you need to create another tunnel if you need to connect to another site in both cases. Wouldn’t it be nice if it is possible to proxy traffic to any site using the SSH channel created? That’s what dynamic port forwarding is all about.

Dynamic Port Forwarding

Dynamic port forwarding allows to configure one local port for tunnelling data to all remote destinations. However to utilize this the client application connecting to local port should send their traffic using the SOCKS protocol. At the client side of the tunnel a SOCKS proxy would be created and the application (eg. browser) uses the SOCKS protocol to specify where the traffic should be sent when it leaves the other end of the ssh tunnel.

ssh -D 9001 home (Executed from 'work')

Here SSH will create a SOCKS proxy listening in for connections at local port
9001 and upon receiving a request would route the traffic via SSH channel
created between ‘work’ and ‘home’. For this it is required to configure the
browser to point to the SOCKS proxy at port 9001 at localhost.

Nowadays we are constantly reminded of the virtues of being proactive or more

colloquially put “Being one step ahead of the game” when it comes to handling

our businesses whether it be a SME or a multi-national cooperation. Quickly

detecting or in some cases even predicting, trends in activities originating

within and outside the organization and streamlining business activities

accordingly may decide between death or life, of the business it’s said. The

often touted solution for this problem is implementing a proper monitoring

solution which would give the decision makers relevant information at correct

time. However most businesses are at a loss where to begin or how to properly

implement means of obtaining such insights. This is not surprising given that even

the buzzwords surrounding the monitoring concepts tend to be fuzzy.

 

Whoa.. That’s some pretty serious language (OK it is, at least to me :). I

consider my self linguistically challenged when it comes to English.). Well I

wanted to start with a serious note since we are dealing with a serious subject

here right??. :). Anyway this says a part of the story when it comes to

business monitoring domain. Sometimes the monitoring solutions forced on

businesses are just like this. Some serious mumbo jumbo with hundreds of bells

and whistles which most of us don’t care to understand. And of course some

times not capturing what really needs to be monitored in the business as well.

On top of that there is a buzz word soup surrounding the monitoring products

which each vendor come up with different interpretations according to their

implementations. Anyway let’s get some perspective on some business monitoring

key words according to the way I see it.

 

Let’s monitor some activities

 

“Business Activity Monitoring” is a term coined by Gartner Inc. which is

defined as the “The aggregation, analysis and presentation of real-time

information about activities inside organizations and involving customers and

partners”. However it can be seen the term is used in different contexts

meaning different things to different people specially when it comes vendor

solutions. The confusion tends be mostly around the fact on what can be

considered a business activity. For example for a business executive a sale of

a product will be a perfectly valid business activity which need to be

monitored while for tech op guy would need monitoring on the load of the server

hosting the sales application. I have heard some people say the latter does not

really falls under the term “Business Activity” since that level of monitoring

is of no importance to strategic decision-making of the business. But as far as

I believe it is no less important and should be a part of a comprehensive

monitoring solution since any high level decisions made would depend on the

smooth functioning of daily operations supported by a proper functioning

infrastructure (If servers are out sales numbers are going to get hurt. So will

the sales projections. Simple as that). It’s a matter of providing a suitable

view to each intended user group according to the type of monitoring

information they are interested in.

 

Anyway latter kind of monitoring may better fit under “Operational Intelligence”

category of which I will be talking about in a bit. In that sense we can think of

“Business Activity Monitoring” as a subset of “Business Monitoring” so that

this fulfills a part of the holistic view on approaching the monitoring problem

where all of what needs to be monitored in the business would come under

a comprehensive monitoring solution. This is one major point where the

vendors differ in their solutions. Some monitoring solutions focus on

a mixture of monitoring aspects and so their definition of BAM varies

accordingly.

 

BPM – A side kick??

 

Another difference between various BAM solutions is in the way they are

implemented. Some assume the presence of an existence of a Business Process

Management(BPM) solution, mostly from the same vendor and so the monitoring

solution is tightly coupled to that. While these kinds of solutions may provide

better integration in terms of the products in my opinion they lack the

flexibility to monitor most business scenarios where no business process

management solutions are in place. If the monitoring framework is generic

enough it’s a matter of putting required data capturing agents at points of

interest to capture and send data to the BAM solution which should be able to

correlate events from incoming events. However if there is a BPM solution

already present from the same vendor it should also be able to leverage that as

well. This way it would provide most flexibility in terms of monitoring

requirements.

 

Key to success – KPI

 

Another term mentioned side by side with BAM is key performance

indicators(KPI). A BAM solution would monitor a set of predefined KPIs and make

sure that necessary actions are taken (it may be firing some alerts to relevant

parties or even automatically triggering some corrective action if possible)

when KPIs are not met with respect to their desired values. A good definition that

I found on what constitute a KPI is as follows.

Key Performance Indicators are quantifiable measurements that reflect the critical success factors of an organization. They will differ depending on the organization

So these are highly specific to the organization. Let me give a couple of simple examples on KPIs.

  1. For a retail store a valid KPI would be the percentage of days where daily sales revenue target was not met.
  2. For a delivery service a KPI would monitor the number of deliveries that went 10% overtime than their expected delivery times.
  3. A KPI for a call center would monitor the number of calls which took less than 2 minutes to resolve the problem.

Here we can identify the importance of the ability to customize the KPI

definitions according to the nature of the business. While properly identifying

the necessary KPIs should be done with involvement of the business management,

the BAM solution should facilitate defining business specific KPI definitions.

 

Intelligence in Operations – OI

 

Next comes the “Operational Intelligence” aspect of the business monitoring. It

is more or less similar to “Business Activity Monitoring” except that

“Operational Intelligence” is more oriented towards monitoring day today

business activities and geared to find issues in the system in real-time in

order for taking corrective actions. I believe technical operations monitoring

fits under this description since it involves the day-to-day aspect and the

required response times for any found issue should be more real-time. But

business matrices requiring close monitoring may well be included as part of

“Operational Intelligence” aspects as well. So here comes another word (“Real

time”) in to the mix which means different things to different people. There

are several levels of real-timeness as per products we see in the market. Some

position them as real-time monitoring solutions while others support near real

time monitoring and the boundary between these are blurry at best. As with any

thing else when it comes to monitoring, the required response time of the

solution depends on the context. A solution monitoring a business critical

application server may require response times within several seconds while a

low volume internal application server may not need such real-time monitoring.

A good rule of thumb should be that if it’s real-time expect a sub minute

response time while if it’s near real-time a couple of minutes lag at times may

be acceptable. Of course the vendors can stretch these either way according to

their implementations. So always try to read between the lines of marketing

terms to really see whether the solution a vendor is proposing really matches

what is required.

 

CEP to the rescue

 

Often the response times required by “Operational Intelligence” monitoring

necessitates the usage of a Complex Event Processing(CEP) solution underneath

which would monitor incoming event streams upon entry and trigger certain

actions when anomalies are detected. So the real-timeness of the product will

directly depend upon the performance characteristics and scalability of the CEP

solution used underneath.

 

Another type of Intelligence – BI

 

Next type of “Intelligence” a business want is “Business Intelligence”. Yeah I

know there are so many types of “Intelligences” floating around and this is one

of the important ones. This is geared towards finding trends in business

operations and market environment and coming up with predictions on the

business conditions. This is basically a historical data analysis which may

pull out data from a data ware house do some ETL operations and run some data

mining operations on data to gain new insights on business operations. So these

jobs are not real-time rather batch jobs which are scheduled at suitable

intervals.

 

Ok. I think that’s enough for a day. Hope I made some sense out of the

monitoring buzz word fiesta. Hopefully this post would be good base for a next

post I plan to write some time soon in which I would outline some practical

experiences me and our team had while implementing a business monitoring
solution ourselves.

I have been reading on Join implementations available for Hadoop for past few days. In this post I recap some techniques I learnt during the process. The joins can be done at both Map side and Join side according to the nature of data sets of to be joined.

Reduce Side Join

Let’s take the following tables containing employee and department data.

Let’s see how join query below can be achieved using reduce side join.


SELECT Employees.Name, Employees.Age, Department.Name  FROM Employees INNER JOIN Department ON Employees.Dept_Id=Department.Dept_Id

Map side is responsible for emitting the join predicate values along with the corresponding record from each table so that records having same department id in both tables will end up at on same reducer which would then do the joining of records having same department id. However it is also required to tag the each record to indicate from which table the record originated so that joining happens between records of two tables. Following diagram illustrates the reduce side join process.

Here is the pseudo code for map function for this scenario.


map (K table, V rec) {

   dept_id = rec.Dept_Id

   tagged_rec.tag = table

   tagged_rec.rec = rec

   emit(dept_id, tagged_rec)

}

At reduce side join happens within records having different tags.


reduce (K dept_id, list<tagged_rec> tagged_recs)  {

   for (tagged_rec : tagged_recs) {

      for (tagged_rec1 : taagged_recs) {

          if (tagged_rec.tag != tagged_rec1.tag) {

              joined_rec = join(tagged_rec, tagged_rec1)

          }
       emit (tagged_rec.rec.Dept_Id, joined_rec)

    }

}

Map Side Join (Replicated Join)

Using Distributed Cache on Smaller Table

For this implementation to work one relation has to fit in to memory. The smaller table is replicated to each node and loaded to the memory. The join happens at map side without reducer involvement which significantly speeds up the process since this avoids shuffling all data across the network even-though most of the records not matching are later dropped. Smaller table can be populated to a hash-table so look-up by Dept_Id can be done. The pseudo code is outlined below.


map (K table, V rec) {

list recs = lookup(rec.Dept_Id) // Get smaller table records having this Dept_Id

for (small_table_rec : recs) {

joined_rec = join (small_table_rec, rec)

}

emit (rec.Dept_id, joined_rec)

}

Using Distributed Cache on Filtered Table

If the smaller table doesn’t fit the memory it may be possible to prune the contents of it if  filtering expression has been specified in the query. Consider following query.


SELECT Employees.Name, Employees.Age, Department.Name  FROM Employees INNER JOIN Department ON Employees.Dept_Id=Department.Dept_Id WHERE Department.Name="Eng"

Here a smaller data set can be derived from Department table by filtering out records having department names other than “Eng”. Now it may be possible to do replicated map side join with this smaller data set.

Replicated Semi-Join

Reduce Side Join with Map Side Filtering

Even of the filtered data of small table doesn’t fit in to the memory it may be possible to include just the Dept_Id s of filtered records in the replicated data set. Then at map side this cache can be used to filter out records which would be sent over to reduce side thus reducing the amount of data moved between the mappers and reducers.

The map side logic would look as follows.


map (K table, V rec) {

   // Check if this record needs to be sent to reducer
   boolean sendToReducer = check_cache(rec.Dept_Id)
   if (sendToReducer) {
      dept_id = rec.Dept_Id

      tagged_rec.tag = table

      tagged_rec.rec = rec

      emit(dept_id, tagged_rec)
   }
}

Reducer side logic would be same as the Reduce Side Join case.

Using a Bloom Filter

A bloom filter is a construct which can be used to test the containment of a given element in a set. A smaller representation of filtered Dept_ids can be derived if Dept_Id values can be augmented in to a bloom filter. Then this bloom filter can be replicated to each node. At the map side for each record fetched from the smaller table the bloom filter can be used to check whether the Dept_Id in the record is present in the bloom filter and only if so to emit that particular record to reduce side. Since a bloom filter is guaranteed not to provide false negatives the result would be accurate.

References

[1] Hadoop In Action

[2] Hadoop : The Definitive Guide

Been there. Done that. And suffered for that…

Programming is fun. But there are some other associated stuff we programmers blissfully skip or procrastinate because they are not so cool.

End result?…

Somebody is going to get hurt at the end of the day and that somebody may very well be a ourselves. So here are some stuff I have experienced and some of the stuff I my self have been guilty of doing and insights I gotten from them.

Good ol’ docs

It’s a well documented fact that documentation is.. hmm well.. let me think.. Good to have. Or is it important? Yep I know the feeling :). But it’s as things turn out, is some thing that needs to be done at the end of day. Who said we programmers do not have to toil for our food 🙂 right?. From a user’s perspective a feature without proper documentation is a near close to a feature which is not there at all. Say you developed a really neat feature and obviously you want people to try it out right? But what if they are not able to wrap their head around how to use it or they have to play a guessing game to get for trying to get it to work and in the process failing miserably? Now not only you have wasted their time but also have earned some bad karma. And yes, an intuitive user interface can go a long way to ease user’s pain but a good, to the point documentation sprinkled on top makes up a recipe that users can’t get enough of.

The Extra Mile

Say you developed this new cool feature. But in the hurry of pushing it off you cut some corners and left some manual step in the usage flow which better would have been done behind the curtains unbeknownst to the user. Now the user has to do this manual step every time he uses your feature which quickly becomes a pain specially if it turns out to be a heavily used feature. Optimize the UX. Cut unnecessary stuff from the user flow. Go that extra mile and users will thank you for that.

Mind your cycle

Go easy on your self. Make your development cycle quicker. Say you have some repetitive process to do in order to make the code you wrote to run in the test environment in order to check whether your feature/ fix is working correctly. Invest some time on automating this process, may be writing a handy script and it will help you to finish your work early and then go play :).

Let’s configure it

What if user want to fine tune the size of foo queue holding tasks for the bar thread pool of your program? Oh ok let’s make it configurable via UI then right? Or should we?? Too much configurability thrown at user’s face kills user experience. Do not force your users to fill in stuff which are better left with some sensible defaults every time they use your stuff. It may be that there is no need to configure every nook and corner of your program to make it work the way you want. Decide what should be in and what should be out. Better yet the middle ground to come would be to provide those configurations in an optional advanced configuration section with some sensible defaults which if user sees fit will go and change. And also remember to document them clearly as well so that user knows better when configuring those.

Nasty API docs

Wrong API docs are worse than having no API docs at all. It really happened to me once with a JMS API not working as published in its API docs. And I thought my thread programming was causing it. Took some considerable amount of hairs pulled to figure out the fault is with the API. Since my assumptions of the API derived from the docs were wrong, so was my program. Specially be mindful when you are changing an existing API implementation whether the assumptions and results returned in certain conditions specified in API docs still holds. If not change the docs accordingly.

Carpenters wanted..

Manage your broken windows. You may have to cut some corners and pull out some hacks due to time or release pressures. It’s OK as long as you know what your broken windows are and you intend to repair them the first chance you get. Leave some reminders and attend to them when you get the chance.

Love thy code.

Show that you care so that others will care. If you maintain your code in a good condition the other people taking over or contributing to your code will tend to care about maintaining it the same way. This is specially important in open source settings where you will not be the only one mending a piece of code written by you at the end of the day.

So there goes my list of tidbits on programming for the better. Pretty much regulation and common sense stuff which does not warrant a mention you might say. But as mentioned in the beginning I have been there. Done that. And have paid for that :).  And we keep doing that as well. So I hope this will post serve as a reminder for me at least, when I am on verge of doing some nasty thing next time around :). Anyway this is just my 2 cents. Please holler if you beg to differ.

Cassandra pagination has been the subject of several blogs elsewhere as well. In particular this excellent blog by Michael Kopp details how this can be generally handled using Cassandra API. We also had a our own use case where we needed paginated access. However for our use case scheme outlined by Micheal presents several shortcomings.

1. What if we want to fetch rows batch wise instead of columns?

2. If there are updates during the paged retrieval there is a chance that some items will be missed out. For example let’s say the last access is at column with column key with “Florence”. So the next retrieval would fetch a batch starting from “Florence” on wards. What if a column with key “Cologne” has been newly added? It would not get included in any of the future retrieval.

3. Also there may be a use case where it is required paginate the results obtained by filtering with a range query rather than fetching all the rows page wise in the column family. (Actually this was our use case)

 

So let’s have a look at how we took a stab at the beast, Cassandra pagination. Before that let me explain our use case fully so that it’s easier to grasp what we did and why we did it. Our use main case was to paginate the access to results returned from a range query which can cleanly expressed in SQL lingo as follows.

 


SELECT * FROM <column_family> WHERE <column_name_1> BETWEEN [from_1] AND [to_1] AND <column_name_2> BETWEEN [from_2] AND [to_2] .... AND <column_name_n> BETWEEN <from_n> AND <to_n>

 

Here each column_name is an index. (Actually a sub index of a composite index. For a description on our indexing scheme refer to my earlier blog Cassandra: Lessons Learnt) . So our use case is bit complicated in that it’s required to paginate the access of the result set obtained from a range query. Also our requirement was to fetch all the rows satisfying this criteria without missing any row provided that there would be new additions while we are retrieving rows in batches. In fact there may be a considerable time-lapse between two fetches since the retrieved data are processed using a scheduled task with configurable interval in our use case.  Additionally we had to leave the room for non batched access of the range query result as well. And of course we were not using the OrderedPartitioner. (Evils of OrderedPartitioner is well documented elsewhere. Sub optimal loadbalancing, creating hot spots etc.. ). Had we used OrderedPartitioner our life would have been bit easier since we would have been able to do a range query on the rows. But since we were using RandomPartitioner no ordering of rows using row keys can be assumed as well.

 

Ok that’s enough for the predicament that we were in couple of months back while faced with the task of ‘Cassandrafication’ our data layer. Hope you got the idea.. Now let’s see what we did to improve the situation.

 

First we had to deal with our inability to do range query on rows. Cassandra has this nice caveat, that columns of a particular row is always sorted using the column keys. So we utilized this nicety to impose an ordering on rows. We always maintain a meta row in which all the row keys are stored as columns. (Actually a row key is a column key and column value is empty). Let’s say this row is ‘RowIndex’. (See figure 1). Now when doing a query on column family we first query this row using a range query and get the rows matching the criteria and then do the real row fetching one by one using the row keys fetched. You might be wondering how the range query is constructed to match the where clauses in the given SQL above. In our scheme the row key is constituted from concatenating the value for each index. (Index is in fact a column in a particular row and we use the column value as the index value. This will become clearer by having a look at the first step of illustration given in figure 2). So this is the scheme we used for non batched retrieval of rows satisfying a particular query.

 

Figure 1 : Column family with meta row ‘RowIndex’

 

But for paginated use case this proved to be insufficient due to the second shortcoming outlined earlier. We realized that there needs to be an ordering from the timestamp to catch a newly added row even if its row key put it in a location in sorted order which is before the last accessed row key. So we introduced another meta row storing the timestamp of insertion of each row. Let’s say this row key of this meta row is ‘TimeStampIndex’. Each column of this row will hold the insertion timestamp as the column key and the corresponding row key of the row inserted at that particular timestamp as the column value. So now we need to do four things we add a row to the column family.

 

Figure 2 : Row insertion algorithm

1. Create the row key using the defined indexes. Here we use ‘server’ and ‘time’ as the indexes.

2. Insert row key in to the ‘RowIndex’ as a column.

3. Insert the row insertion timestamp along with row key as a column to the ‘TimeStampIndex’

4. Add the row itself to the column family.

 

‘RowIndex’ is to be used for non batched access of the range query result while ‘TimeStampIndex’ is to be used for batched access of the range query result.

 

Now when we want to fetch the rows in batches satisfying the range query criteria, first we get a batch size chunk of timestamps from ‘TimeStampIndex’. Then for each and every row associated with the timestamp we check whether if the row matches the filter criteria. This is a simple string comparison to check whether the row key falls between the range first and range last values.

 

Say for example the filter criteria for above illustration is following where clause.


WHERE 'server' BETWEEN 'esb' and 'esb' and 'hour' BETWEEN '08:00' and '09:00'

Now the range first value of the query would be ‘esb—08:00’ and the range last value would be ‘esb—09:00’. This will select events for server ‘esb’ during the hours from ’08:00′ to ’09:00′.  So if the row key is ‘esb—08:23’ it will get picked and if it is ‘esb—09:23’ it won’t.

 

So as can be seen for this scenario we didn’t use ‘RowIndex’ meta row. It’s for non batched use only. And in this way using ‘TimeStampIndex’ we can catch newly added rows without missing out on any row.

 

However it’s not without its own drawbacks.

1. The batch size is not consistent. Even though the batch size chunk is picked from the query some of these rows will be discarded since they do not match the filtering criteria. Solution would be to get multiple batches until the batch size number of rows fulfilling the filter criteria is found. But for now we are ok with inconsistent batch sizes.

2. What if an existing row is updated? It will get fetched a second time since the algorithm will not miss any newly added or updated row. This may or may not be desirable according to the use case. For us this is in fact the needed behavior since we need any new updates to an already fetched row. So we are ok with that too.

 

So that concludes our escapade with Cassandra pagination. The (love) story continues.. (Hope you saw the sarcasm sign unlike Sheldon.. :))