Added a messaging service implementation on top of IOLoop. Added ability to easily switch between netty and io loop (default is netty)

Change-Id: Id9af0756bf0a542f832f3611b486b2ac680b91e4
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java
index 9f4d867..46560e4 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.cluster.messaging;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
@@ -35,6 +34,7 @@
     private final NodeId sender;
     private final MessageSubject subject;
     private final byte[] payload;
+    private transient byte[] response;
 
     /**
      * Creates a cluster message.
@@ -77,13 +77,21 @@
     }
 
     /**
-     * Sends a response to the sender.
+     * Records the response to be sent to the sender.
      *
-     * @param data payload response.
-     * @throws IOException when I/O exception of some sort has occurred
+     * @param data response payload
      */
-    public void respond(byte[] data) throws IOException {
-        throw new IllegalStateException("One can only respond to message received from others.");
+    public void respond(byte[] data) {
+        response = data;
+    }
+
+    /**
+     * Returns the response to be sent to the sender.
+     *
+     * @return response bytes
+     */
+    public byte[] response() {
+        return response;
     }
 
     @Override
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/Endpoint.java
similarity index 93%
rename from utils/netty/src/main/java/org/onlab/netty/Endpoint.java
rename to core/api/src/main/java/org/onosproject/store/cluster/messaging/Endpoint.java
index af97763..2ac50df 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/Endpoint.java
@@ -13,9 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging;
 
-import static com.google.common.base.Preconditions.*;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Objects;
 
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
similarity index 76%
rename from utils/netty/src/main/java/org/onlab/netty/MessagingService.java
rename to core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index 1b42b04..3fe335b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -13,16 +13,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onlab.netty;
+package org.onosproject.store.cluster.messaging;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * Interface for low level messaging primitives.
  */
 public interface MessagingService {
+
     /**
      * Sends a message asynchronously to the specified communication end point.
      * The message is specified using the type and payload.
@@ -31,7 +34,7 @@
      * @param payload message payload bytes.
      * @throws IOException when I/O exception of some sort has occurred
      */
-    public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
+    void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
 
     /**
      * Sends a message synchronously and waits for a response.
@@ -40,7 +43,7 @@
      * @param payload message payload.
      * @return a response future
      */
-    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
+    CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
 
     /**
      * Registers a new message handler for message type.
@@ -48,19 +51,19 @@
      * @param handler message handler
      * @param executor executor to use for running message handler logic.
      */
-    public void registerHandler(String type, MessageHandler handler, Executor executor);
+    void registerHandler(String type, Consumer<byte[]> handler, Executor executor);
 
     /**
      * Registers a new message handler for message type.
      * @param type message type.
      * @param handler message handler
+     * @param executor executor to use for running message handler logic.
      */
-    @Deprecated
-    public void registerHandler(String type, MessageHandler handler);
+    void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor);
 
     /**
      * Unregister current handler, if one exists for message type.
      * @param type message type
      */
-    public void unregisterHandler(String type);
+    void unregisterHandler(String type);
 }
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 7384988..ba456ca 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -53,6 +53,12 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
+            <artifactId>onlab-nio</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onlab-misc</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index f458bfe..c81079d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -19,14 +19,12 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.hazelcast.util.AddressUtil;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
 import org.joda.time.DateTime;
-import org.onlab.netty.Endpoint;
-import org.onlab.netty.Message;
-import org.onlab.netty.MessageHandler;
 import org.onlab.netty.NettyMessagingService;
 import org.onlab.packet.IpAddress;
 import org.onlab.util.KryoNamespace;
@@ -38,6 +36,7 @@
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.consistent.impl.DatabaseDefinition;
 import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
 import org.onosproject.store.serializers.KryoNamespaces;
@@ -56,6 +55,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -108,7 +108,7 @@
     private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
     private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
     private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
-    private NettyMessagingService messagingService = new NettyMessagingService();
+    private NettyMessagingService messagingService;
     private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
             groupedThreads("onos/cluster/membership", "heartbeat-sender"));
     private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
@@ -149,7 +149,6 @@
         establishSelfIdentity();
 
         messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
-
         try {
             messagingService.activate();
         } catch (InterruptedException e) {
@@ -376,10 +375,10 @@
         throw new IllegalStateException("Unable to determine local ip");
     }
 
-    private class HeartbeatMessageHandler implements MessageHandler {
+    private class HeartbeatMessageHandler implements Consumer<byte[]> {
         @Override
-        public void handle(Message message) throws IOException {
-            HeartbeatMessage hb = SERIALIZER.decode(message.payload());
+        public void accept(byte[] message) {
+            HeartbeatMessage hb = SERIALIZER.decode(message);
             failureDetector.report(hb.source().id());
             hb.knownPeers().forEach(node -> {
                 allNodes.put(node.id(), node);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 042bda2..6f47b48 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,18 +21,17 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.netty.Endpoint;
-import org.onlab.netty.Message;
-import org.onlab.netty.MessageHandler;
-import org.onlab.netty.MessagingService;
 import org.onlab.netty.NettyMessagingService;
+import org.onlab.nio.service.IOLoopMessagingService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,17 +63,28 @@
     // TODO: This probably should not be a OSGi service.
     private MessagingService messagingService;
 
+    private final boolean useNetty = true;
+
     @Activate
     public void activate() {
         ControllerNode localNode = clusterService.getLocalNode();
-        NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
-        // FIXME: workaround until it becomes a service.
-        try {
-            netty.activate();
-        } catch (Exception e) {
-            log.error("NettyMessagingService#activate", e);
+        if (useNetty) {
+            NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
+            try {
+                netty.activate();
+                messagingService = netty;
+            } catch (Exception e) {
+                log.error("NettyMessagingService#activate", e);
+            }
+        } else {
+            IOLoopMessagingService ioLoop = new IOLoopMessagingService(localNode.ip(), localNode.tcpPort());
+            try {
+                ioLoop.activate();
+                messagingService = ioLoop;
+            } catch (Exception e) {
+                log.error("IOLoopMessagingService#activate", e);
+            }
         }
-        messagingService = netty;
         log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
     }
 
@@ -83,9 +93,13 @@
         // TODO: cleanup messageingService if needed.
         // FIXME: workaround until it becomes a service.
         try {
-            ((NettyMessagingService) messagingService).deactivate();
+            if (useNetty) {
+                ((NettyMessagingService) messagingService).deactivate();
+            } else {
+                ((IOLoopMessagingService) messagingService).deactivate();
+            }
         } catch (Exception e) {
-            log.error("NettyMessagingService#deactivate", e);
+            log.error("MessagingService#deactivate", e);
         }
         log.info("Stopped");
     }
@@ -232,7 +246,9 @@
     public void addSubscriber(MessageSubject subject,
                               ClusterMessageHandler subscriber,
                               ExecutorService executor) {
-        messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber), executor);
+        messagingService.registerHandler(subject.value(),
+                new InternalClusterMessageHandler(subscriber),
+                executor);
     }
 
     @Override
@@ -240,31 +256,6 @@
         messagingService.unregisterHandler(subject.value());
     }
 
-    private final class InternalClusterMessageHandler implements MessageHandler {
-
-        private final ClusterMessageHandler handler;
-
-        public InternalClusterMessageHandler(ClusterMessageHandler handler) {
-            this.handler = handler;
-        }
-
-        @Override
-        public void handle(Message message) {
-            final ClusterMessage clusterMessage;
-            try {
-                clusterMessage = ClusterMessage.fromBytes(message.payload());
-            } catch (Exception e) {
-                log.error("Failed decoding {}", message, e);
-                throw e;
-            }
-            try {
-                handler.handle(new InternalClusterMessage(clusterMessage, message));
-            } catch (Exception e) {
-                log.trace("Failed handling {}", clusterMessage, e);
-                throw e;
-            }
-        }
-    }
 
     @Override
     public <M, R> void addSubscriber(MessageSubject subject,
@@ -287,7 +278,22 @@
                 executor);
     }
 
-    private class InternalMessageResponder<M, R> implements MessageHandler {
+    private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
+        private ClusterMessageHandler handler;
+
+        public InternalClusterMessageHandler(ClusterMessageHandler handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public byte[] apply(byte[] bytes) {
+            ClusterMessage message = ClusterMessage.fromBytes(bytes);
+            handler.handle(message);
+            return message.response();
+        }
+    }
+
+    private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
         private final Function<byte[], M> decoder;
         private final Function<R, byte[]> encoder;
         private final Function<M, R> handler;
@@ -299,14 +305,15 @@
             this.encoder = encoder;
             this.handler = handler;
         }
+
         @Override
-        public void handle(Message message) throws IOException {
-            R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
-            message.respond(encoder.apply(response));
+        public byte[] apply(byte[] bytes) {
+            R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
+            return encoder.apply(reply);
         }
     }
 
-    private class InternalMessageConsumer<M> implements MessageHandler {
+    private class InternalMessageConsumer<M> implements Consumer<byte[]> {
         private final Function<byte[], M> decoder;
         private final Consumer<M> consumer;
 
@@ -314,24 +321,10 @@
             this.decoder = decoder;
             this.consumer = consumer;
         }
-        @Override
-        public void handle(Message message) throws IOException {
-            consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
-        }
-    }
-
-    public static final class InternalClusterMessage extends ClusterMessage {
-
-        private final Message rawMessage;
-
-        public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
-            super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
-            this.rawMessage = rawMessage;
-        }
 
         @Override
-        public void respond(byte[] response) throws IOException {
-            rawMessage.respond(response);
+        public void accept(byte[] bytes) {
+            consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 8ae7d2a..7f8769b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -77,7 +77,6 @@
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -278,11 +277,7 @@
                 FlowRule rule = SERIALIZER.decode(message.payload());
                 log.trace("received get flow entry request for {}", rule);
                 FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
-                try {
-                    message.respond(SERIALIZER.encode(flowEntry));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
+                message.respond(SERIALIZER.encode(flowEntry));
             }
         }, executor);
 
@@ -293,11 +288,7 @@
                 DeviceId deviceId = SERIALIZER.decode(message.payload());
                 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
                 Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
-                try {
-                    message.respond(SERIALIZER.encode(flowEntries));
-                } catch (IOException e) {
-                    log.error("Failed to respond to peer's getFlowEntries request", e);
-                }
+                message.respond(SERIALIZER.encode(flowEntries));
             }
         }, executor);
 
@@ -308,11 +299,7 @@
                 FlowEntry rule = SERIALIZER.decode(message.payload());
                 log.trace("received get flow entry request for {}", rule);
                 FlowRuleEvent event = removeFlowRuleInternal(rule);
-                try {
-                    message.respond(SERIALIZER.encode(event));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
+                message.respond(SERIALIZER.encode(event));
             }
         }, executor);
     }
@@ -691,11 +678,7 @@
                 // TODO: we might want to wrap response in envelope
                 // to distinguish sw programming failure and hand over
                 // it make sense in the latter case to retry immediately.
-                try {
-                    message.respond(SERIALIZER.encode(allFailed));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
+                message.respond(SERIALIZER.encode(allFailed));
                 return;
             }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index b609a3c..b7d69b2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -51,7 +51,6 @@
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
 import org.slf4j.Logger;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -143,11 +142,7 @@
                       @Override
                       public void run() {
                           FlowExtCompletedOperation result = Futures.getUnchecked(f);
-                          try {
-                              message.respond(SERIALIZER.encode(result));
-                          } catch (IOException e) {
-                              log.error("Failed to respond back", e);
-                          }
+                          message.respond(SERIALIZER.encode(result));
                       }
                   }, futureListeners);
               }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 27a33b3..bc31e08 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -22,7 +22,6 @@
 import static org.slf4j.LoggerFactory.getLogger;
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -300,11 +299,7 @@
         @Override
         public void handle(ClusterMessage message) {
             DeviceId deviceId = SERIALIZER.decode(message.payload());
-            try {
-                message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
-            } catch (IOException e) {
-                log.error("Failed to responsd to role query", e);
-            }
+            message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
         }
     }
 
@@ -318,11 +313,7 @@
         @Override
         public void handle(ClusterMessage message) {
             DeviceId deviceId = SERIALIZER.decode(message.payload());
-            try {
-                message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
-            } catch (IOException e) {
-                log.error("Failed to relinquish role.", e);
-            }
+            message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
         }
     }
 
@@ -371,4 +362,4 @@
         return m.matches();
     }
 
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index 907631d..cfbcec8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -43,7 +43,6 @@
 import org.onosproject.store.serializers.KryoSerializer;
 import org.slf4j.Logger;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
@@ -118,11 +117,7 @@
             @Override
             public void handle(ClusterMessage message) {
                 ConnectPoint cp = SERIALIZER.decode(message.payload());
-                try {
-                    message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
+                message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
             }
         }, messageHandlingExecutor);
 
@@ -131,11 +126,7 @@
             @Override
             public void handle(ClusterMessage message) {
                 ConnectPoint cp = SERIALIZER.decode(message.payload());
-                try {
-                    message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
+                message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
             }
         }, messageHandlingExecutor);
         log.info("Started");
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
index c95412b..9e5677c 100644
--- a/utils/netty/pom.xml
+++ b/utils/netty/pom.xml
@@ -39,6 +39,10 @@
         </dependency>
         <dependency>
             <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onlab-misc</artifactId>
         </dependency>
         <dependency>
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
deleted file mode 100644
index db38c19..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-//FIXME: Should be move out to test or app
-/**
- * Message handler that echos the message back to the sender.
- */
-public class EchoHandler implements MessageHandler {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Override
-    public void handle(Message message) throws IOException {
-        log.info("Received message. Echoing it back to the sender.");
-        message.respond(message.payload());
-    }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index 9d42b6d..102e2a2 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -15,9 +15,8 @@
  */
 package org.onlab.netty;
 
-import java.io.IOException;
-
 import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.store.cluster.messaging.Endpoint;
 
 import com.google.common.base.MoreObjects;
 
@@ -25,20 +24,14 @@
  * Internal message representation with additional attributes
  * for supporting, synchronous request/reply behavior.
  */
-public final class InternalMessage implements Message {
+public final class InternalMessage {
 
-    public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+    private final long id;
+    private final Endpoint sender;
+    private final String type;
+    private final byte[] payload;
 
-    private long id;
-    private Endpoint sender;
-    private String type;
-    private byte[] payload;
-    private transient NettyMessagingService messagingService;
-
-    // Must be created using the Builder.
-    private InternalMessage() {}
-
-    InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+    public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
         this.id = id;
         this.sender = sender;
         this.type = type;
@@ -57,26 +50,10 @@
         return sender;
     }
 
-    @Override
     public byte[] payload() {
         return payload;
     }
 
-    protected void setMessagingService(NettyMessagingService messagingService) {
-        this.messagingService = messagingService;
-    }
-
-    @Override
-    public void respond(byte[] data) throws IOException {
-        Builder builder = new Builder(messagingService);
-        InternalMessage message = builder.withId(this.id)
-            .withSender(messagingService.localEp())
-            .withPayload(data)
-            .withType(REPLY_MESSAGE_TYPE)
-            .build();
-        messagingService.sendAsync(sender, message);
-    }
-
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
@@ -86,39 +63,4 @@
                 .add("payload", ByteArraySizeHashPrinter.of(payload))
                 .toString();
     }
-
-    /**
-     * Builder for InternalMessages.
-     */
-    public static final class Builder {
-        private InternalMessage message;
-
-        public Builder(NettyMessagingService messagingService) {
-            message = new InternalMessage();
-            message.messagingService = messagingService;
-        }
-
-        public Builder withId(long id) {
-            message.id = id;
-            return this;
-        }
-
-        public Builder withType(String type) {
-            message.type = type;
-            return this;
-        }
-
-        public Builder withSender(Endpoint sender) {
-            message.sender = sender;
-            return this;
-        }
-        public Builder withPayload(byte[] payload) {
-            message.payload = payload;
-            return this;
-        }
-
-        public InternalMessage build() {
-            return message;
-        }
-    }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
deleted file mode 100644
index d91e8ae..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A MessageHandler that simply logs the information.
- */
-public class LoggingHandler implements MessageHandler {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Override
-    public void handle(Message message) {
-        log.info("Received message. Payload has {} bytes", message.payload().length);
-    }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java
deleted file mode 100644
index 2baea96..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/Message.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-
-/**
- * A unit of communication.
- * Has a payload. Also supports a feature to respond back to the sender.
- */
-public interface Message {
-
-    /**
-     * Returns the payload of this message.
-     * @return message payload.
-     */
-    public byte[] payload();
-
-    /**
-     * Sends a reply back to the sender of this message.
-     * @param data payload of the response.
-     * @throws IOException if there is a communication error.
-     */
-    public void respond(byte[] data) throws IOException;
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 440fc52..c34d3cc 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -24,6 +24,7 @@
 
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,8 +37,6 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private final NettyMessagingService messagingService;
-
     private long messageId;
     private Version ipVersion;
     private IpAddress senderIp;
@@ -46,9 +45,8 @@
     private String messageType;
     private int contentLength;
 
-    public MessageDecoder(NettyMessagingService messagingService) {
+    public MessageDecoder() {
         super(DecoderState.READ_MESSAGE_ID);
-        this.messagingService = messagingService;
     }
 
     @Override
@@ -91,7 +89,6 @@
                     new Endpoint(senderIp, senderPort),
                     messageType,
                     payload);
-            message.setMessagingService(messagingService);
             out.add(message);
             checkpoint(DecoderState.READ_MESSAGE_ID);
             break;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index f5a8d2e..2b7784f 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -24,6 +24,7 @@
 
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
deleted file mode 100644
index a87d8fc..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import java.io.IOException;
-
-/**
- * Handler for a message.
- */
-public interface MessageHandler {
-
-    /**
-     * Handles the message.
-     *
-     * @param message message.
-     * @throws IOException if an error is encountered handling the message
-     */
-    public void handle(Message message) throws IOException;
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index da72886..eeba05e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -46,10 +46,14 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,17 +63,18 @@
 import com.google.common.cache.RemovalNotification;
 
 /**
- * A Netty based implementation of MessagingService.
+ * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
  */
 public class NettyMessagingService implements MessagingService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+
     private final Endpoint localEp;
-    private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
-            .maximumSize(100000)
             .expireAfterWrite(10, TimeUnit.SECONDS)
             .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
                 @Override
@@ -124,6 +129,7 @@
     }
 
     public void activate() throws InterruptedException {
+        channels.setLifo(false);
         channels.setTestOnBorrow(true);
         channels.setTestOnReturn(true);
         initEventLoopGroup();
@@ -146,12 +152,10 @@
 
     @Override
     public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
-        InternalMessage message = new InternalMessage.Builder(this)
-            .withId(messageIdGenerator.incrementAndGet())
-            .withSender(localEp)
-            .withType(type)
-            .withPayload(payload)
-            .build();
+        InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+                                                      localEp,
+                                                      type,
+                                                      payload);
         sendAsync(ep, message);
     }
 
@@ -164,7 +168,7 @@
         try {
             try {
                 channel = channels.borrowObject(ep);
-                channel.eventLoop().execute(new WriteTask(channel, message));
+                channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
             } finally {
                 channels.returnObject(ep, channel);
             }
@@ -173,7 +177,6 @@
         } catch (Exception e) {
             throw new IOException(e);
         }
-
     }
 
     @Override
@@ -181,12 +184,7 @@
         CompletableFuture<byte[]> response = new CompletableFuture<>();
         Long messageId = messageIdGenerator.incrementAndGet();
         responseFutures.put(messageId, response);
-        InternalMessage message = new InternalMessage.Builder(this)
-            .withId(messageId)
-            .withSender(localEp)
-            .withType(type)
-            .withPayload(payload)
-            .build();
+        InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
         try {
             sendAsync(ep, message);
         } catch (Exception e) {
@@ -197,24 +195,26 @@
     }
 
     @Override
-    public void registerHandler(String type, MessageHandler handler) {
-        handlers.put(type, handler);
+    public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
     }
 
     @Override
-    public void registerHandler(String type, MessageHandler handler, Executor executor) {
-        handlers.put(type, new MessageHandler() {
-            @Override
-            public void handle(Message message) throws IOException {
-                executor.execute(() -> {
-                    try {
-                        handler.handle(message);
-                    } catch (Exception e) {
-                        log.debug("Failed to process message of type {}", type, e);
-                    }
-                });
+    public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> {
+            byte[] responsePayload = handler.apply(message.payload());
+            if (responsePayload != null) {
+                InternalMessage response = new InternalMessage(message.id(),
+                        localEp,
+                        REPLY_MESSAGE_TYPE,
+                        responsePayload);
+                try {
+                    sendAsync(message.sender(), response);
+                } catch (IOException e) {
+                    log.debug("Failed to respond", e);
+                }
             }
-        });
+        }));
     }
 
     @Override
@@ -222,14 +222,12 @@
         handlers.remove(type);
     }
 
-    private MessageHandler getMessageHandler(String type) {
-        return handlers.get(type);
-    }
-
     private void startAcceptingConnections() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
         b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
         b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+        b.option(ChannelOption.SO_RCVBUF, 1048576);
+        b.option(ChannelOption.TCP_NODELAY, true);
         b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         b.group(serverGroup, clientGroup)
             .channel(serverChannelClass)
@@ -258,8 +256,9 @@
         public Channel makeObject(Endpoint ep) throws Exception {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
-            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
+            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
+            bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
             bootstrap.group(clientGroup);
             // TODO: Make this faster:
             // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
@@ -268,6 +267,7 @@
             bootstrap.handler(new OnosCommunicationChannelInitializer());
             // Start the client.
             ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
+            log.info("Established a new connection to {}", ep);
             return f.channel();
         }
 
@@ -291,27 +291,11 @@
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
                 .addLast("encoder", encoder)
-                .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
+                .addLast("decoder", new MessageDecoder())
                 .addLast("handler", dispatcher);
         }
     }
 
-    private static class WriteTask implements Runnable {
-
-        private final InternalMessage message;
-        private final Channel channel;
-
-        public WriteTask(Channel channel, InternalMessage message) {
-            this.channel = channel;
-            this.message = message;
-        }
-
-        @Override
-        public void run() {
-            channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-        }
-    }
-
     @ChannelHandler.Sharable
     private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
 
@@ -329,10 +313,10 @@
 
     private void dispatchLocally(InternalMessage message) throws IOException {
         String type = message.type();
-        if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
+        if (REPLY_MESSAGE_TYPE.equals(type)) {
             try {
                 CompletableFuture<byte[]> futureResponse =
-                    NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+                    responseFutures.getIfPresent(message.id());
                 if (futureResponse != null) {
                     futureResponse.complete(message.payload());
                 } else {
@@ -341,13 +325,13 @@
                             + " request handle", message.id(), message.sender());
                 }
             } finally {
-                NettyMessagingService.this.responseFutures.invalidate(message.id());
+                responseFutures.invalidate(message.id());
             }
             return;
         }
-        MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+        Consumer<InternalMessage> handler = handlers.get(type);
         if (handler != null) {
-            handler.handle(message);
+            handler.accept(message);
         } else {
             log.debug("No handler registered for {}", type);
         }
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
index 3eb0d39..61d8541 100644
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -18,13 +18,17 @@
 import static org.junit.Assert.assertArrayEquals;
 
 import java.net.InetAddress;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import org.apache.commons.lang3.RandomUtils;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Simple ping-pong test that exercises NettyMessagingService.
@@ -39,9 +43,9 @@
         try {
             pinger.activate();
             ponger.activate();
-            ponger.registerHandler("echo", new EchoHandler());
+            ponger.registerHandler("echo", Function.identity(), MoreExecutors.directExecutor());
             byte[] payload = RandomUtils.nextBytes(100);
-            Future<byte[]> responseFuture =
+            CompletableFuture<byte[]> responseFuture =
                     pinger.sendAndReceive(
                             new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
             assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
diff --git a/utils/nio/pom.xml b/utils/nio/pom.xml
index 2e66b7d..ce38b35 100644
--- a/utils/nio/pom.xml
+++ b/utils/nio/pom.xml
@@ -37,6 +37,14 @@
             <artifactId>guava-testlib</artifactId>
             <scope>test</scope>
         </dependency>
+                <dependency>
+            <groupId>commons-pool</groupId>
+            <artifactId>commons-pool</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.onosproject</groupId>
             <artifactId>onlab-misc</artifactId>
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java b/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java
new file mode 100644
index 0000000..0d97792
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio.service;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+
+/**
+ * IOLoop for transporting DefaultMessages.
+ */
+public class DefaultIOLoop extends IOLoop<DefaultMessage, DefaultMessageStream> {
+
+    public static final int SELECT_TIMEOUT_MILLIS = 500;
+    private static final int MAX_IDLE_TIMEOUT_MILLIS = 1000;
+    private static final int BUFFER_SIZE = 1024 * 1024;
+    private final Consumer<DefaultMessage> consumer;
+
+    public DefaultIOLoop(Consumer<DefaultMessage> consumer) throws IOException {
+        this(SELECT_TIMEOUT_MILLIS, consumer);
+    }
+
+    public DefaultIOLoop(long timeout, Consumer<DefaultMessage> consumer) throws IOException {
+        super(timeout);
+        this.consumer = consumer;
+    }
+
+    @Override
+    protected DefaultMessageStream createStream(ByteChannel byteChannel) {
+        return new DefaultMessageStream(this, byteChannel, BUFFER_SIZE, MAX_IDLE_TIMEOUT_MILLIS);
+    }
+
+    @Override
+    protected void processMessages(List<DefaultMessage> messages, MessageStream<DefaultMessage> stream) {
+        messages.forEach(consumer);
+    }
+
+    @Override
+    protected void connect(SelectionKey key) throws IOException {
+        DefaultMessageStream stream = (DefaultMessageStream) key.attachment();
+        try {
+            super.connect(key);
+            stream.connected();
+        } catch (Exception e) {
+            stream.connectFailed(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java b/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java
new file mode 100644
index 0000000..591d49b
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio.service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.nio.AbstractMessage;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.MoreObjects;
+
+/**
+ * Default message.
+ */
+public class DefaultMessage extends AbstractMessage {
+
+    private long id;
+    private Endpoint sender;
+    private String type;
+    private byte[] payload;
+
+    /**
+     * Creates a new message with the specified data.
+     *
+     * @param id message id
+     * @param type message type
+     * @param sender sender endpoint
+     * @param payload message payload
+     */
+    DefaultMessage(long id, Endpoint sender, String type, byte[] payload) {
+        this.id = id;
+        this.type = checkNotNull(type, "Type cannot be null");
+        this.sender = checkNotNull(sender, "Sender cannot be null");
+        this.payload = checkNotNull(payload, "Payload cannot be null");
+
+        byte[] messageTypeBytes = type.getBytes(Charsets.UTF_8);
+        IpAddress senderIp = sender.host();
+        byte[] ipOctets = senderIp.toOctets();
+
+        length = 25 + ipOctets.length + messageTypeBytes.length + payload.length;
+    }
+
+    /**
+     * Returns message id.
+     *
+     * @return message id
+     */
+    public long id() {
+        return id;
+    }
+
+    /**
+     * Returns message sender.
+     *
+     * @return message sender
+     */
+    public Endpoint sender() {
+        return sender;
+    }
+
+    /**
+     * Returns message type.
+     *
+     * @return message type
+     */
+    public String type() {
+        return type;
+    }
+
+    /**
+     * Returns message payload.
+     *
+     * @return payload
+     */
+    public byte[] payload() {
+        return payload;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("id", id)
+                .add("type", type)
+                .add("sender", sender)
+                .add("payload", ByteArraySizeHashPrinter.of(payload))
+                .toString();
+    }
+}
\ No newline at end of file
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java b/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java
new file mode 100644
index 0000000..a7dd3c0
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio.service;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Default bi-directional message stream for transferring messages to &amp; from the
+ * network via two byte buffers.
+ */
+public class DefaultMessageStream extends MessageStream<DefaultMessage> {
+
+    private final CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+
+    public DefaultMessageStream(
+            IOLoop<DefaultMessage, ?> loop,
+            ByteChannel byteChannel,
+            int bufferSize,
+            int maxIdleMillis) {
+        super(loop, byteChannel, bufferSize, maxIdleMillis);
+    }
+
+    public CompletableFuture<DefaultMessageStream> connectedFuture() {
+        return connectFuture.thenApply(v -> this);
+    }
+
+    private final AtomicInteger messageLength = new AtomicInteger(-1);
+
+    @Override
+    protected DefaultMessage read(ByteBuffer buffer) {
+        if (messageLength.get() == -1) {
+            // check if we can read the message length.
+            if (buffer.remaining() < Integer.BYTES) {
+                return null;
+            } else {
+                messageLength.set(buffer.getInt());
+            }
+        }
+
+        if (buffer.remaining() < messageLength.get()) {
+            return null;
+        }
+
+        long id = buffer.getLong();
+        Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6;
+        byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
+        buffer.get(octects);
+        IpAddress senderIp = IpAddress.valueOf(ipVersion, octects);
+        int senderPort = buffer.getInt();
+        int messageTypeByteLength = buffer.getInt();
+        byte[] messageTypeBytes = new byte[messageTypeByteLength];
+        buffer.get(messageTypeBytes);
+        String messageType = new String(messageTypeBytes, Charsets.UTF_8);
+        int payloadLength = buffer.getInt();
+        byte[] payloadBytes = new byte[payloadLength];
+        buffer.get(payloadBytes);
+
+        // reset for next message
+        messageLength.set(-1);
+
+        return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes);
+    }
+
+    @Override
+    protected void write(DefaultMessage message, ByteBuffer buffer) {
+        Endpoint sender = message.sender();
+        byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
+        IpAddress senderIp = sender.host();
+        byte[] ipOctets = senderIp.toOctets();
+        byte[] payload = message.payload();
+
+        int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length;
+
+        buffer.putInt(messageLength);
+
+        buffer.putLong(message.id());
+
+        if (senderIp.version() == Version.INET) {
+            buffer.put((byte) 0x0);
+        } else {
+            buffer.put((byte) 0x1);
+        }
+        buffer.put(ipOctets);
+
+        // write sender port
+        buffer.putInt(sender.port());
+
+        // write length of message type
+        buffer.putInt(messageTypeBytes.length);
+
+        // write message type bytes
+        buffer.put(messageTypeBytes);
+
+        // write payload length
+        buffer.putInt(payload.length);
+
+        // write payload.
+        buffer.put(payload);
+    }
+
+    /**
+     * Callback invoked when the stream is successfully connected.
+     */
+    public void connected() {
+        connectFuture.complete(null);
+    }
+
+    /**
+     * Callback invoked when the stream fails to connect.
+     * @param cause failure cause
+     */
+    public void connectFailed(Throwable cause) {
+        connectFuture.completeExceptionally(cause);
+    }
+}
\ No newline at end of file
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java
new file mode 100644
index 0000000..ce917f7
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java
@@ -0,0 +1,324 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio.service;
+
+import static org.onlab.util.Tools.groupedThreads;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.onlab.nio.AcceptorLoop;
+import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+
+/**
+ * MessagingService implementation based on IOLoop.
+ */
+public class IOLoopMessagingService implements MessagingService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
+
+    static final int PORT = 9876;
+    static final long TIMEOUT = 1000;
+
+    static final boolean SO_NO_DELAY = false;
+    static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
+    static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
+
+    private static final int NUM_WORKERS = 8;
+
+    private AcceptorLoop acceptorLoop;
+    private final ExecutorService acceptorThreadPool =
+            Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
+    private final ExecutorService ioThreadPool =
+            Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
+
+    private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
+
+    private int lastWorker = -1;
+
+    private final Endpoint localEp;
+
+    private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
+            new GenericKeyedObjectPool<Endpoint, DefaultMessageStream>(new DefaultMessageStreamFactory());
+
+    private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
+    private final AtomicLong messageIdGenerator = new AtomicLong(0);
+    private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+            .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
+                @Override
+                public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
+                    if (entry.wasEvicted()) {
+                        entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
+                    }
+                }
+            })
+            .build();
+
+
+    public IOLoopMessagingService(int port) {
+        this(new Endpoint(IpAddress.valueOf("127.0.0.1"), port));
+    }
+
+    public IOLoopMessagingService(IpAddress ip, int port) {
+        this(new Endpoint(ip, port));
+    }
+
+    public IOLoopMessagingService(Endpoint localEp) {
+        this.localEp = localEp;
+    }
+
+    /**
+     * Returns the local endpoint.
+     * @return local endpoint
+     */
+    public Endpoint localEp() {
+        return localEp;
+    }
+
+    /**
+     * Activates IO Loops.
+     * @throws IOException is activation fails
+     */
+    public void activate() throws IOException {
+        streams.setLifo(false);
+        this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
+
+        for (int i = 0; i < NUM_WORKERS; i++) {
+            ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
+        }
+
+        ioLoops.forEach(loop -> ioThreadPool.execute(loop));
+        acceptorThreadPool.execute(acceptorLoop);
+        ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
+        acceptorLoop.awaitStart(TIMEOUT);
+    }
+
+    /**
+     * Shuts down IO loops.
+     */
+    public void deactivate() {
+        ioLoops.forEach(loop -> loop.shutdown());
+        acceptorLoop.shutdown();
+        ioThreadPool.shutdown();
+        acceptorThreadPool.shutdown();
+    }
+
+
+    @Override
+    public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
+        DefaultMessage message = new DefaultMessage(
+                messageIdGenerator.incrementAndGet(),
+                localEp,
+                type,
+                payload);
+        sendAsync(ep, message);
+    }
+
+    protected void sendAsync(Endpoint ep, DefaultMessage message) throws IOException {
+        if (ep.equals(localEp)) {
+            dispatchLocally(message);
+            return;
+        }
+
+        DefaultMessageStream stream = null;
+        try {
+            stream = streams.borrowObject(ep);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        try {
+            stream.write(message);
+        } finally {
+            try {
+                streams.returnObject(ep, stream);
+            } catch (Exception e) {
+                log.warn("Failed to return stream to pool");
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<byte[]> sendAndReceive(
+            Endpoint ep,
+            String type,
+            byte[] payload) {
+        CompletableFuture<byte[]> response = new CompletableFuture<>();
+        Long messageId = messageIdGenerator.incrementAndGet();
+        responseFutures.put(messageId, response);
+        DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
+        try {
+            sendAsync(ep, message);
+        } catch (Exception e) {
+            responseFutures.invalidate(messageId);
+            response.completeExceptionally(e);
+        }
+        return response;
+    }
+
+    @Override
+    public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
+    }
+
+    @Override
+    public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> {
+            byte[] responsePayload = handler.apply(message.payload());
+            if (responsePayload != null) {
+                DefaultMessage response = new DefaultMessage(message.id(),
+                        localEp,
+                        REPLY_MESSAGE_TYPE,
+                        responsePayload);
+                try {
+                    sendAsync(message.sender(), response);
+                } catch (IOException e) {
+                    log.debug("Failed to respond", e);
+                }
+            }
+        }));
+    }
+
+    @Override
+    public void unregisterHandler(String type) {
+        handlers.remove(type);
+    }
+
+    protected void dispatchLocally(DefaultMessage message) {
+        String type = message.type();
+        if (REPLY_MESSAGE_TYPE.equals(type)) {
+            try {
+                CompletableFuture<byte[]> futureResponse =
+                        responseFutures.getIfPresent(message.id());
+                if (futureResponse != null) {
+                    futureResponse.complete(message.payload());
+                } else {
+                    log.warn("Received a reply for message id:[{}]. "
+                            + " from {}. But was unable to locate the"
+                            + " request handle", message.id(), message.sender());
+                }
+            } finally {
+                responseFutures.invalidate(message.id());
+            }
+            return;
+        }
+        Consumer<DefaultMessage> handler = handlers.get(type);
+        if (handler != null) {
+            handler.accept(message);
+        } else {
+            log.debug("No handler registered for {}", type);
+        }
+    }
+
+    // Get the next worker to which a client should be assigned
+    private synchronized DefaultIOLoop nextWorker() {
+        lastWorker = (lastWorker + 1) % NUM_WORKERS;
+        return ioLoops.get(lastWorker);
+    }
+
+    /**
+     * Initiates open connection request and registers the pending socket
+     * channel with the given IO loop.
+     *
+     * @param loop loop with which the channel should be registered
+     * @throws java.io.IOException if the socket could not be open or connected
+     */
+    private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
+        SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
+        SocketChannel ch = SocketChannel.open();
+        ch.configureBlocking(false);
+        DefaultMessageStream stream = loop.connectStream(ch);
+        ch.connect(sa);
+        return stream;
+    }
+
+    // Loop for accepting client connections
+    private class DefaultAcceptorLoop extends AcceptorLoop {
+
+        public DefaultAcceptorLoop(SocketAddress address) throws IOException {
+            super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
+        }
+
+        @Override
+        protected void acceptConnection(ServerSocketChannel channel) throws IOException {
+            SocketChannel sc = channel.accept();
+            sc.configureBlocking(false);
+
+            Socket so = sc.socket();
+            so.setTcpNoDelay(SO_NO_DELAY);
+            so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
+            so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
+
+            nextWorker().acceptStream(sc);
+        }
+    }
+
+    private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
+
+        @Override
+        public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
+        }
+
+        @Override
+        public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
+            stream.close();
+        }
+
+        @Override
+        public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
+            DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
+            log.info("Established a new connection to {}", ep);
+            return stream;
+        }
+
+        @Override
+        public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
+        }
+
+        @Override
+        public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
+            return stream.isClosed();
+        }
+    }
+}
\ No newline at end of file