ClusterMessagingProtocol: stop processing in netty handler thread

- Fix for io.netty.util.concurrent.BlockingOperationException

Change-Id: Ie0f4dee2c3a49aa4b03674f6f7678f32fcf07a44
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 475db82..d96ecbf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -4,12 +4,12 @@
 import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
+import static java.util.concurrent.Executors.newCachedThreadPool;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,9 +49,7 @@
     private ControllerNode remoteNode;
     private final AtomicBoolean connectionOK = new AtomicBoolean(true);
 
-    // TODO: make this non-static and stop on close
-    private static final ExecutorService THREAD_POOL
-        = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
+    private ExecutorService pool;
 
     public ClusterMessagingProtocolClient(
             ClusterService clusterService,
@@ -87,11 +85,19 @@
 
     @Override
     public synchronized CompletableFuture<Void> connect() {
+        if (pool == null || pool.isShutdown()) {
+            // TODO include remote name?
+            pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-client-%d"));
+        }
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public synchronized CompletableFuture<Void> close() {
+        if (pool != null) {
+            pool.shutdownNow();
+            pool = null;
+        }
         return CompletableFuture.completedFuture(null);
     }
 
@@ -112,7 +118,11 @@
 
     private <I, O> CompletableFuture<O> requestReply(I request) {
         CompletableFuture<O> future = new CompletableFuture<>();
-        THREAD_POOL.submit(new RPCTask<I, O>(request, future));
+        if (pool == null) {
+            log.info("Attempted to use closed client, connecting now. {}", request);
+            connect();
+        }
+        pool.submit(new RPCTask<I, O>(request, future));
         return future;
     }
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index 373bc97..d5532d5 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -1,19 +1,20 @@
 package org.onlab.onos.store.service.impl;
 
+import static java.util.concurrent.Executors.newCachedThreadPool;
+import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.*;
+import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
 
 import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
 import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
 import net.kuujo.copycat.protocol.RequestHandler;
 import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
 import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
 import net.kuujo.copycat.spi.protocol.ProtocolServer;
 
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
@@ -27,12 +28,15 @@
 public class ClusterMessagingProtocolServer implements ProtocolServer {
 
     private final Logger log = getLogger(getClass());
+
+    private final ClusterCommunicationService clusterCommunicator;
+
     private volatile RequestHandler handler;
-    private ClusterCommunicationService clusterCommunicator;
+
+    private ExecutorService pool;
 
     public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
         this.clusterCommunicator = clusterCommunicator;
-
     }
 
     @Override
@@ -42,67 +46,128 @@
 
     @Override
     public CompletableFuture<Void> listen() {
-        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING,
-                                          new CopycatMessageHandler<PingRequest>());
-        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC,
-                                          new CopycatMessageHandler<SyncRequest>());
-        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL,
-                                          new CopycatMessageHandler<PollRequest>());
-        clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT,
-                                          new CopycatMessageHandler<SubmitRequest>());
+        if (pool == null || pool.isShutdown()) {
+            pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-server-%d"));
+        }
+
+        clusterCommunicator.addSubscriber(COPYCAT_PING, new PingHandler());
+        clusterCommunicator.addSubscriber(COPYCAT_SYNC, new SyncHandler());
+        clusterCommunicator.addSubscriber(COPYCAT_POLL, new PollHandler());
+        clusterCommunicator.addSubscriber(COPYCAT_SUBMIT, new SubmitHandler());
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public CompletableFuture<Void> close() {
-        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
-        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
-        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
-        clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
+        clusterCommunicator.removeSubscriber(COPYCAT_PING);
+        clusterCommunicator.removeSubscriber(COPYCAT_SYNC);
+        clusterCommunicator.removeSubscriber(COPYCAT_POLL);
+        clusterCommunicator.removeSubscriber(COPYCAT_SUBMIT);
+        if (pool != null) {
+            pool.shutdownNow();
+            pool = null;
+        }
         return CompletableFuture.completedFuture(null);
     }
 
-    private class CopycatMessageHandler<T> implements ClusterMessageHandler {
+    private final class PingHandler extends CopycatMessageHandler<PingRequest> {
+
+        @Override
+        public void raftHandle(PingRequest request, ClusterMessage message) {
+            pool.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    currentHandler().ping(request)
+                        .whenComplete(new PostExecutionTask<>(message));
+                }
+            });
+        }
+    }
+
+    private final class SyncHandler extends CopycatMessageHandler<SyncRequest> {
+
+        @Override
+        public void raftHandle(SyncRequest request, ClusterMessage message) {
+            pool.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    currentHandler().sync(request)
+                        .whenComplete(new PostExecutionTask<>(message));
+                }
+            });
+        }
+    }
+
+    private final class PollHandler extends CopycatMessageHandler<PollRequest> {
+
+        @Override
+        public void raftHandle(PollRequest request, ClusterMessage message) {
+            pool.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    currentHandler().poll(request)
+                    .whenComplete(new PostExecutionTask<>(message));
+                }
+            });
+        }
+    }
+
+    private final class SubmitHandler extends CopycatMessageHandler<SubmitRequest> {
+
+        @Override
+        public void raftHandle(SubmitRequest request, ClusterMessage message) {
+            pool.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    currentHandler().submit(request)
+                    .whenComplete(new PostExecutionTask<>(message));
+                }
+            });
+        }
+    }
+
+    private abstract class CopycatMessageHandler<T> implements ClusterMessageHandler {
+
+        public abstract void raftHandle(T request, ClusterMessage message);
 
         @Override
         public void handle(ClusterMessage message) {
-            T request = ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
-            if (handler == null) {
+            T request = DB_SERIALIZER.decode(message.payload());
+            raftHandle(request, message);
+        }
+
+        RequestHandler currentHandler() {
+            RequestHandler currentHandler = handler;
+            if (currentHandler == null) {
                 // there is a slight window of time during state transition,
                 // where handler becomes null
+                long sleepMs = 1;
                 for (int i = 0; i < 10; ++i) {
-                    if (handler != null) {
+                    currentHandler = handler;
+                    if (currentHandler != null) {
                         break;
                     }
                     try {
-                        Thread.sleep(1);
+                        sleepMs <<= 1;
+                        Thread.sleep(sleepMs);
                     } catch (InterruptedException e) {
-                        log.trace("Exception", e);
+                        log.error("Interrupted", e);
+                        return handler;
                     }
                 }
-                if (handler == null) {
+                if (currentHandler == null) {
                     log.error("There was no handler registered!");
-                    return;
+                    return handler;
                 }
             }
-            if (request.getClass().equals(PingRequest.class)) {
-                handler.ping((PingRequest) request)
-                    .whenComplete(new PostExecutionTask<PingResponse>(message));
-            } else if (request.getClass().equals(PollRequest.class)) {
-                handler.poll((PollRequest) request)
-                    .whenComplete(new PostExecutionTask<PollResponse>(message));
-            } else if (request.getClass().equals(SyncRequest.class)) {
-                handler.sync((SyncRequest) request)
-                    .whenComplete(new PostExecutionTask<SyncResponse>(message));
-            } else if (request.getClass().equals(SubmitRequest.class)) {
-                handler.submit((SubmitRequest) request)
-                    .whenComplete(new PostExecutionTask<SubmitResponse>(message));
-            } else {
-                throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
-            }
+            return currentHandler;
         }
 
-        private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
+        final class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
 
             private final ClusterMessage message;
 
@@ -111,15 +176,15 @@
             }
 
             @Override
-            public void accept(R response, Throwable t) {
-                if (t != null) {
-                    log.error("Processing for " + message.subject() + " failed.", t);
+            public void accept(R response, Throwable error) {
+                if (error != null) {
+                    log.error("Processing {} failed.", message.subject(),  error);
                 } else {
                     try {
                         log.trace("responding to {}", message.subject());
-                        message.respond(ClusterMessagingProtocol.DB_SERIALIZER.encode(response));
+                        message.respond(DB_SERIALIZER.encode(response));
                     } catch (Exception e) {
-                        log.error("Failed to respond to " + response.getClass().getName(), e);
+                        log.error("Failed responding with {}", response.getClass().getName(), e);
                     }
                 }
             }