Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
new file mode 100644
index 0000000..b2b490e
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
@@ -0,0 +1,68 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An asynchronous response.
+ * This class provides a base implementation of Response, with methods to retrieve the
+ * result and query to see if the result is ready. The result can only be retrieved when
+ * it is ready and the get methods will block if the result is not ready yet.
+ * @param <T> type of response.
+ */
+public class AsyncResponse<T> implements Response<T> {
+
+    private T value;
+    private boolean done = false;
+    private final long start = System.nanoTime();
+
+    @Override
+    public T get(long timeout, TimeUnit tu) throws TimeoutException {
+        timeout = tu.toNanos(timeout);
+        boolean interrupted = false;
+        try {
+            synchronized (this) {
+                while (!done) {
+                    try {
+                        long timeRemaining = timeout - (System.nanoTime() - start);
+                        if (timeRemaining <= 0) {
+                            throw new TimeoutException("Operation timed out.");
+                        }
+                        TimeUnit.NANOSECONDS.timedWait(this, timeRemaining);
+                    } catch (InterruptedException e) {
+                        interrupted = true;
+                    }
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+        return value;
+    }
+
+    @Override
+    public T get() throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isReady() {
+        return done;
+    }
+
+    /**
+     * Sets response value and unblocks any thread blocking on the response to become
+     * available.
+     * @param data response data.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized void setResponse(Object data) {
+        if (!done) {
+            done = true;
+            value = (T) data;
+            this.notifyAll();
+        }
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
new file mode 100644
index 0000000..313a448
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
@@ -0,0 +1,15 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Message handler that echos the message back to the sender.
+ */
+public class EchoHandler implements MessageHandler {
+
+    @Override
+    public void handle(Message message) throws IOException {
+        System.out.println("Received: " + message.payload() + ". Echoing it back to the sender.");
+        message.respond(message.payload());
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
new file mode 100644
index 0000000..8681093
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -0,0 +1,62 @@
+package org.onlab.netty;
+
+/**
+ * Representation of a TCP/UDP communication end point.
+ */
+public class Endpoint {
+
+    private final int port;
+    private final String host;
+
+    public Endpoint(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    public String host() {
+        return host;
+    }
+
+    public int port() {
+        return port;
+    }
+
+    @Override
+    public String toString() {
+        return "Endpoint [port=" + port + ", host=" + host + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((host == null) ? 0 : host.hashCode());
+        result = prime * result + port;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Endpoint other = (Endpoint) obj;
+        if (host == null) {
+            if (other.host != null) {
+                return false;
+            }
+        } else if (!host.equals(other.host)) {
+            return false;
+        }
+        if (port != other.port) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
new file mode 100644
index 0000000..bcf6f52
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -0,0 +1,85 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Internal message representation with additional attributes
+ * for supporting, synchronous request/reply behavior.
+ */
+public final class InternalMessage implements Message {
+
+    private long id;
+    private Endpoint sender;
+    private String type;
+    private Object payload;
+    private transient NettyMessagingService messagingService;
+    public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
+
+    // Must be created using the Builder.
+    private InternalMessage() {}
+
+    public long id() {
+        return id;
+    }
+
+    public String type() {
+        return type;
+    }
+
+    public Endpoint sender() {
+        return sender;
+    }
+
+    @Override
+    public Object payload() {
+        return payload;
+    }
+
+    @Override
+    public void respond(Object data) throws IOException {
+        Builder builder = new Builder(messagingService);
+        InternalMessage message = builder.withId(this.id)
+             // FIXME: Sender should be messagingService.localEp.
+            .withSender(this.sender)
+            .withPayload(data)
+            .withType(REPLY_MESSAGE_TYPE)
+            .build();
+        messagingService.sendAsync(sender, message);
+    }
+
+
+    /**
+     * Builder for InternalMessages.
+     */
+    public static class Builder {
+        private InternalMessage message;
+
+        public Builder(NettyMessagingService messagingService) {
+            message = new InternalMessage();
+            message.messagingService = messagingService;
+        }
+
+        public Builder withId(long id) {
+            message.id = id;
+            return this;
+        }
+
+        public Builder withType(String type) {
+            message.type = type;
+            return this;
+        }
+
+        public Builder withSender(Endpoint sender) {
+            message.sender = sender;
+            return this;
+        }
+        public Builder withPayload(Object payload) {
+            message.payload = payload;
+            return this;
+        }
+
+        public InternalMessage build() {
+            return message;
+        }
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
new file mode 100644
index 0000000..73c01a0
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -0,0 +1,47 @@
+package org.onlab.netty;
+
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * Kryo Serializer.
+ */
+public class KryoSerializer implements Serializer {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private KryoPool serializerPool;
+
+    public KryoSerializer() {
+        setupKryoPool();
+    }
+
+    /**
+     * Sets up the common serialzers pool.
+     */
+    protected void setupKryoPool() {
+        // FIXME Slice out types used in common to separate pool/namespace.
+        serializerPool = KryoPool.newBuilder()
+                .register(ArrayList.class,
+                          HashMap.class,
+                          ArrayList.class
+                )
+                .build()
+                .populate(1);
+    }
+
+
+    @Override
+    public Object decode(byte[] data) {
+        return serializerPool.deserialize(data);
+    }
+
+    @Override
+    public byte[] encode(Object payload) {
+        return serializerPool.serialize(payload);
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
new file mode 100644
index 0000000..ed6cdb4
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
@@ -0,0 +1,12 @@
+package org.onlab.netty;
+
+/**
+ * A MessageHandler that simply logs the information.
+ */
+public class LoggingHandler implements MessageHandler {
+
+    @Override
+    public void handle(Message message) {
+        System.out.println("Received: " + message.payload());
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java
new file mode 100644
index 0000000..54b9526
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Message.java
@@ -0,0 +1,23 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * A unit of communication.
+ * Has a payload. Also supports a feature to respond back to the sender.
+ */
+public interface Message {
+
+    /**
+     * Returns the payload of this message.
+     * @return message payload.
+     */
+    public Object payload();
+
+    /**
+     * Sends a reply back to the sender of this messge.
+     * @param data payload of the response.
+     * @throws IOException if there is a communication error.
+     */
+    public void respond(Object data) throws IOException;
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
new file mode 100644
index 0000000..ecf2d62
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -0,0 +1,58 @@
+package org.onlab.netty;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+/**
+ * Decode bytes into a InternalMessage.
+ */
+public class MessageDecoder extends ByteToMessageDecoder {
+
+    private final NettyMessagingService messagingService;
+    private final Serializer serializer;
+
+    public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
+        this.messagingService = messagingService;
+        this.serializer = serializer;
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext context, ByteBuf in,
+            List<Object> messages) throws Exception {
+
+        byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array();
+        checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+
+        // read message Id.
+        long id = in.readLong();
+
+        // read message type; first read size and then bytes.
+        String type = new String(in.readBytes(in.readInt()).array());
+
+        // read sender host name; first read size and then bytes.
+        String host = new String(in.readBytes(in.readInt()).array());
+
+        // read sender port.
+        int port = in.readInt();
+
+        Endpoint sender = new Endpoint(host, port);
+
+        // read message payload; first read size and then bytes.
+        Object payload = serializer.decode(in.readBytes(in.readInt()).array());
+
+        InternalMessage message = new InternalMessage.Builder(messagingService)
+                .withId(id)
+                .withSender(sender)
+                .withType(type)
+                .withPayload(payload)
+                .build();
+
+        messages.add(message);
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
new file mode 100644
index 0000000..1b52a0f
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -0,0 +1,60 @@
+package org.onlab.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+/**
+ * Encode InternalMessage out into a byte buffer.
+ */
+public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
+
+    // onosiscool in ascii
+    public static final byte[] PREAMBLE = "onosiscool".getBytes();
+
+    private final Serializer serializer;
+
+    public MessageEncoder(Serializer serializer) {
+        this.serializer = serializer;
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext context, InternalMessage message,
+            ByteBuf out) throws Exception {
+
+        // write preamble
+        out.writeBytes(PREAMBLE);
+
+        // write id
+        out.writeLong(message.id());
+
+        // write type length
+        out.writeInt(message.type().length());
+
+        // write type
+        out.writeBytes(message.type().getBytes());
+
+        // write sender host name size
+        out.writeInt(message.sender().host().length());
+
+        // write sender host name.
+        out.writeBytes(message.sender().host().getBytes());
+
+        // write port
+        out.writeInt(message.sender().port());
+
+        try {
+            serializer.encode(message.payload());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        byte[] payload = serializer.encode(message.payload());
+
+        // write payload length.
+        out.writeInt(payload.length);
+
+        // write payload bytes
+        out.writeBytes(payload);
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
new file mode 100644
index 0000000..7bd5a7f
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
@@ -0,0 +1,16 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Handler for a message.
+ */
+public interface MessageHandler {
+
+    /**
+     * Handles the message.
+     * @param message message.
+     * @throws IOException.
+     */
+    public void handle(Message message) throws IOException;
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
new file mode 100644
index 0000000..ebad442
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -0,0 +1,41 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Interface for low level messaging primitives.
+ */
+public interface MessagingService {
+    /**
+     * Sends a message asynchronously to the specified communication end point.
+     * The message is specified using the type and payload.
+     * @param ep end point to send the message to.
+     * @param type type of message.
+     * @param payload message payload.
+     * @throws IOException
+     */
+    public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
+
+    /**
+     * Sends a message synchronously and waits for a response.
+     * @param ep end point to send the message to.
+     * @param type type of message.
+     * @param payload message payload.
+     * @return a response future
+     * @throws IOException
+     */
+    public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
+
+    /**
+     * Registers a new message handler for message type.
+     * @param type message type.
+     * @param handler message handler
+     */
+    public void registerHandler(String type, MessageHandler handler);
+
+    /**
+     * Unregister current handler, if one exists for message type.
+     * @param type message type
+     */
+    public void unregisterHandler(String type);
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
new file mode 100644
index 0000000..54da8cc
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -0,0 +1,244 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.pool.KeyedObjectPool;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * A Netty based implementation of MessagingService.
+ */
+public class NettyMessagingService implements MessagingService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private KeyedObjectPool<Endpoint, Channel> channels =
+            new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+    private final int port;
+    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+    private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+    private Cache<Long, AsyncResponse<?>> responseFutures;
+    private final Endpoint localEp;
+
+    protected Serializer serializer;
+
+    public NettyMessagingService() {
+        // TODO: Default port should be configurable.
+        this(8080);
+    }
+
+    // FIXME: Constructor should not throw exceptions.
+    public NettyMessagingService(int port) {
+        this.port = port;
+        try {
+            localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
+        } catch (UnknownHostException e) {
+            // bailing out.
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void activate() throws Exception {
+        responseFutures = CacheBuilder.newBuilder()
+                .maximumSize(100000)
+                .weakValues()
+                // TODO: Once the entry expires, notify blocking threads (if any).
+                .expireAfterWrite(10, TimeUnit.MINUTES)
+                .build();
+        startAcceptingConnections();
+    }
+
+    public void deactivate() throws Exception {
+        channels.close();
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+    }
+
+    @Override
+    public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
+        InternalMessage message = new InternalMessage.Builder(this)
+            .withId(RandomUtils.nextLong())
+            .withSender(localEp)
+            .withType(type)
+            .withPayload(payload)
+            .build();
+        sendAsync(ep, message);
+    }
+
+    protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
+        Channel channel = null;
+        try {
+            channel = channels.borrowObject(ep);
+            channel.eventLoop().execute(new WriteTask(channel, message));
+        } catch (Exception e) {
+            throw new IOException(e);
+        } finally {
+            try {
+                channels.returnObject(ep, channel);
+            } catch (Exception e) {
+                log.warn("Error returning object back to the pool", e);
+                // ignored.
+            }
+        }
+    }
+
+    @Override
+    public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
+            throws IOException {
+        AsyncResponse<T> futureResponse = new AsyncResponse<T>();
+        Long messageId = RandomUtils.nextLong();
+        responseFutures.put(messageId, futureResponse);
+        InternalMessage message = new InternalMessage.Builder(this)
+            .withId(messageId)
+            .withSender(localEp)
+            .withType(type)
+            .withPayload(payload)
+            .build();
+        sendAsync(ep, message);
+        return futureResponse;
+    }
+
+    @Override
+    public void registerHandler(String type, MessageHandler handler) {
+        // TODO: Is this the right semantics for handler registration?
+        handlers.putIfAbsent(type, handler);
+    }
+
+    public void unregisterHandler(String type) {
+        handlers.remove(type);
+    }
+
+    private MessageHandler getMessageHandler(String type) {
+        return handlers.get(type);
+    }
+
+    private void startAcceptingConnections() throws InterruptedException {
+        ServerBootstrap b = new ServerBootstrap();
+        b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        b.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new OnosCommunicationChannelInitializer())
+            .option(ChannelOption.SO_BACKLOG, 128)
+            .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        // Bind and start to accept incoming connections.
+        b.bind(port).sync();
+    }
+
+    private class OnosCommunicationChannelFactory
+        implements KeyedPoolableObjectFactory<Endpoint, Channel> {
+
+        @Override
+        public void activateObject(Endpoint endpoint, Channel channel)
+                throws Exception {
+        }
+
+        @Override
+        public void destroyObject(Endpoint ep, Channel channel) throws Exception {
+            channel.close();
+        }
+
+        @Override
+        public Channel makeObject(Endpoint ep) throws Exception {
+            Bootstrap b = new Bootstrap();
+            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            b.group(workerGroup);
+            // TODO: Make this faster:
+            // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
+            b.channel(NioSocketChannel.class);
+            b.option(ChannelOption.SO_KEEPALIVE, true);
+            b.handler(new OnosCommunicationChannelInitializer());
+
+            // Start the client.
+            ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
+            return f.channel();
+        }
+
+        @Override
+        public void passivateObject(Endpoint ep, Channel channel)
+                throws Exception {
+        }
+
+        @Override
+        public boolean validateObject(Endpoint ep, Channel channel) {
+            return channel.isOpen();
+        }
+    }
+
+    private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+        @Override
+        protected void initChannel(SocketChannel channel) throws Exception {
+            channel.pipeline()
+                .addLast(new MessageEncoder(serializer))
+                .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
+                .addLast(new NettyMessagingService.InboundMessageDispatcher());
+        }
+    }
+
+    private class WriteTask implements Runnable {
+
+        private final Object message;
+        private final Channel channel;
+
+        public WriteTask(Channel channel, Object message) {
+            this.message = message;
+            this.channel = channel;
+        }
+
+        @Override
+        public void run() {
+            channel.writeAndFlush(message);
+        }
+    }
+
+    private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
+            String type = message.type();
+            if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
+                try {
+                    AsyncResponse<?> futureResponse =
+                        NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+                    if (futureResponse != null) {
+                        futureResponse.setResponse(message.payload());
+                    }
+                    log.warn("Received a reply. But was unable to locate the request handle");
+                } finally {
+                    NettyMessagingService.this.responseFutures.invalidate(message.id());
+                }
+                return;
+            }
+            MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+            handler.handle(message);
+        }
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java
new file mode 100644
index 0000000..04675ce
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Response.java
@@ -0,0 +1,36 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Response object returned when making synchronous requests.
+ * Can you used to check is a response is ready and/or wait for a response
+ * to become available.
+ *
+ * @param <T> type of response.
+ */
+public interface Response<T> {
+
+    /**
+     * Gets the response waiting for a designated timeout period.
+     * @param timeout timeout period (since request was sent out)
+     * @param tu unit of time.
+     * @return response
+     * @throws TimeoutException if the timeout expires before the response arrives.
+     */
+    public T get(long timeout, TimeUnit tu) throws TimeoutException;
+
+    /**
+     * Gets the response waiting for indefinite timeout period.
+     * @return response
+     * @throws InterruptedException if the thread is interrupted before the response arrives.
+     */
+    public T get() throws InterruptedException;
+
+    /**
+     * Checks if the response is ready without blocking.
+     * @return true if response is ready, false otherwise.
+     */
+    public boolean isReady();
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
new file mode 100644
index 0000000..ac55f5a
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
@@ -0,0 +1,24 @@
+package org.onlab.netty;
+
+/**
+ * Interface for encoding/decoding message payloads.
+ */
+public interface Serializer {
+
+    /**
+     * Decodes the specified byte array to a POJO.
+     *
+     * @param data byte array.
+     * @return POJO
+     */
+    Object decode(byte[] data);
+
+    /**
+     * Encodes the specified POJO into a byte array.
+     *
+     * @param data POJO to be encoded
+     * @return byte array.
+     */
+    byte[] encode(Object message);
+
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
new file mode 100644
index 0000000..1573780
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
@@ -0,0 +1,24 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+
+public final class SimpleClient {
+    private SimpleClient() {}
+
+    public static void main(String... args) throws Exception {
+        NettyMessagingService messaging = new TestNettyMessagingService(9081);
+        messaging.activate();
+
+        messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
+        Response<String> response = messaging.sendAndReceive(new Endpoint("localhost", 8080), "echo", "Hello World");
+        System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
+    }
+
+    public static class TestNettyMessagingService extends NettyMessagingService {
+        public TestNettyMessagingService(int port) throws Exception {
+            super(port);
+            Serializer serializer = new KryoSerializer();
+            this.serializer = serializer;
+        }
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
new file mode 100644
index 0000000..12fa025
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -0,0 +1,19 @@
+package org.onlab.netty;
+
+public final class SimpleServer {
+    private SimpleServer() {}
+
+    public static void main(String... args) throws Exception {
+        NettyMessagingService server = new TestNettyMessagingService();
+        server.activate();
+        server.registerHandler("simple", new LoggingHandler());
+        server.registerHandler("echo", new EchoHandler());
+    }
+
+    public static class TestNettyMessagingService extends NettyMessagingService {
+        protected TestNettyMessagingService() {
+            Serializer serializer = new KryoSerializer();
+            this.serializer = serializer;
+        }
+    }
+}