Added a messaging service implementation on top of IOLoop. Added ability to easily switch between netty and io loop (default is netty)

Change-Id: Id9af0756bf0a542f832f3611b486b2ac680b91e4
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java
new file mode 100644
index 0000000..ce917f7
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java
@@ -0,0 +1,324 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio.service;
+
+import static org.onlab.util.Tools.groupedThreads;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.onlab.nio.AcceptorLoop;
+import org.onlab.packet.IpAddress;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+
+/**
+ * MessagingService implementation based on IOLoop.
+ */
+public class IOLoopMessagingService implements MessagingService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
+
+    static final int PORT = 9876;
+    static final long TIMEOUT = 1000;
+
+    static final boolean SO_NO_DELAY = false;
+    static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
+    static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
+
+    private static final int NUM_WORKERS = 8;
+
+    private AcceptorLoop acceptorLoop;
+    private final ExecutorService acceptorThreadPool =
+            Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
+    private final ExecutorService ioThreadPool =
+            Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
+
+    private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
+
+    private int lastWorker = -1;
+
+    private final Endpoint localEp;
+
+    private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
+            new GenericKeyedObjectPool<Endpoint, DefaultMessageStream>(new DefaultMessageStreamFactory());
+
+    private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
+    private final AtomicLong messageIdGenerator = new AtomicLong(0);
+    private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+            .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
+                @Override
+                public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
+                    if (entry.wasEvicted()) {
+                        entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
+                    }
+                }
+            })
+            .build();
+
+
+    public IOLoopMessagingService(int port) {
+        this(new Endpoint(IpAddress.valueOf("127.0.0.1"), port));
+    }
+
+    public IOLoopMessagingService(IpAddress ip, int port) {
+        this(new Endpoint(ip, port));
+    }
+
+    public IOLoopMessagingService(Endpoint localEp) {
+        this.localEp = localEp;
+    }
+
+    /**
+     * Returns the local endpoint.
+     * @return local endpoint
+     */
+    public Endpoint localEp() {
+        return localEp;
+    }
+
+    /**
+     * Activates IO Loops.
+     * @throws IOException is activation fails
+     */
+    public void activate() throws IOException {
+        streams.setLifo(false);
+        this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
+
+        for (int i = 0; i < NUM_WORKERS; i++) {
+            ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
+        }
+
+        ioLoops.forEach(loop -> ioThreadPool.execute(loop));
+        acceptorThreadPool.execute(acceptorLoop);
+        ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
+        acceptorLoop.awaitStart(TIMEOUT);
+    }
+
+    /**
+     * Shuts down IO loops.
+     */
+    public void deactivate() {
+        ioLoops.forEach(loop -> loop.shutdown());
+        acceptorLoop.shutdown();
+        ioThreadPool.shutdown();
+        acceptorThreadPool.shutdown();
+    }
+
+
+    @Override
+    public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
+        DefaultMessage message = new DefaultMessage(
+                messageIdGenerator.incrementAndGet(),
+                localEp,
+                type,
+                payload);
+        sendAsync(ep, message);
+    }
+
+    protected void sendAsync(Endpoint ep, DefaultMessage message) throws IOException {
+        if (ep.equals(localEp)) {
+            dispatchLocally(message);
+            return;
+        }
+
+        DefaultMessageStream stream = null;
+        try {
+            stream = streams.borrowObject(ep);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        try {
+            stream.write(message);
+        } finally {
+            try {
+                streams.returnObject(ep, stream);
+            } catch (Exception e) {
+                log.warn("Failed to return stream to pool");
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<byte[]> sendAndReceive(
+            Endpoint ep,
+            String type,
+            byte[] payload) {
+        CompletableFuture<byte[]> response = new CompletableFuture<>();
+        Long messageId = messageIdGenerator.incrementAndGet();
+        responseFutures.put(messageId, response);
+        DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
+        try {
+            sendAsync(ep, message);
+        } catch (Exception e) {
+            responseFutures.invalidate(messageId);
+            response.completeExceptionally(e);
+        }
+        return response;
+    }
+
+    @Override
+    public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
+    }
+
+    @Override
+    public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
+        handlers.put(type, message -> executor.execute(() -> {
+            byte[] responsePayload = handler.apply(message.payload());
+            if (responsePayload != null) {
+                DefaultMessage response = new DefaultMessage(message.id(),
+                        localEp,
+                        REPLY_MESSAGE_TYPE,
+                        responsePayload);
+                try {
+                    sendAsync(message.sender(), response);
+                } catch (IOException e) {
+                    log.debug("Failed to respond", e);
+                }
+            }
+        }));
+    }
+
+    @Override
+    public void unregisterHandler(String type) {
+        handlers.remove(type);
+    }
+
+    protected void dispatchLocally(DefaultMessage message) {
+        String type = message.type();
+        if (REPLY_MESSAGE_TYPE.equals(type)) {
+            try {
+                CompletableFuture<byte[]> futureResponse =
+                        responseFutures.getIfPresent(message.id());
+                if (futureResponse != null) {
+                    futureResponse.complete(message.payload());
+                } else {
+                    log.warn("Received a reply for message id:[{}]. "
+                            + " from {}. But was unable to locate the"
+                            + " request handle", message.id(), message.sender());
+                }
+            } finally {
+                responseFutures.invalidate(message.id());
+            }
+            return;
+        }
+        Consumer<DefaultMessage> handler = handlers.get(type);
+        if (handler != null) {
+            handler.accept(message);
+        } else {
+            log.debug("No handler registered for {}", type);
+        }
+    }
+
+    // Get the next worker to which a client should be assigned
+    private synchronized DefaultIOLoop nextWorker() {
+        lastWorker = (lastWorker + 1) % NUM_WORKERS;
+        return ioLoops.get(lastWorker);
+    }
+
+    /**
+     * Initiates open connection request and registers the pending socket
+     * channel with the given IO loop.
+     *
+     * @param loop loop with which the channel should be registered
+     * @throws java.io.IOException if the socket could not be open or connected
+     */
+    private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
+        SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
+        SocketChannel ch = SocketChannel.open();
+        ch.configureBlocking(false);
+        DefaultMessageStream stream = loop.connectStream(ch);
+        ch.connect(sa);
+        return stream;
+    }
+
+    // Loop for accepting client connections
+    private class DefaultAcceptorLoop extends AcceptorLoop {
+
+        public DefaultAcceptorLoop(SocketAddress address) throws IOException {
+            super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
+        }
+
+        @Override
+        protected void acceptConnection(ServerSocketChannel channel) throws IOException {
+            SocketChannel sc = channel.accept();
+            sc.configureBlocking(false);
+
+            Socket so = sc.socket();
+            so.setTcpNoDelay(SO_NO_DELAY);
+            so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
+            so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
+
+            nextWorker().acceptStream(sc);
+        }
+    }
+
+    private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
+
+        @Override
+        public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
+        }
+
+        @Override
+        public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
+            stream.close();
+        }
+
+        @Override
+        public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
+            DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
+            log.info("Established a new connection to {}", ep);
+            return stream;
+        }
+
+        @Override
+        public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
+        }
+
+        @Override
+        public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
+            return stream.isClosed();
+        }
+    }
+}
\ No newline at end of file