ClusterMessagingProtocol: stop processing in netty handler thread

- Fix for io.netty.util.concurrent.BlockingOperationException

Change-Id: Ie0f4dee2c3a49aa4b03674f6f7678f32fcf07a44
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 475db82..d96ecbf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -4,12 +4,12 @@
 import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
+import static java.util.concurrent.Executors.newCachedThreadPool;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,9 +49,7 @@
     private ControllerNode remoteNode;
     private final AtomicBoolean connectionOK = new AtomicBoolean(true);
 
-    // TODO: make this non-static and stop on close
-    private static final ExecutorService THREAD_POOL
-        = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
+    private ExecutorService pool;
 
     public ClusterMessagingProtocolClient(
             ClusterService clusterService,
@@ -87,11 +85,19 @@
 
     @Override
     public synchronized CompletableFuture<Void> connect() {
+        if (pool == null || pool.isShutdown()) {
+            // TODO include remote name?
+            pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-client-%d"));
+        }
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public synchronized CompletableFuture<Void> close() {
+        if (pool != null) {
+            pool.shutdownNow();
+            pool = null;
+        }
         return CompletableFuture.completedFuture(null);
     }
 
@@ -112,7 +118,11 @@
 
     private <I, O> CompletableFuture<O> requestReply(I request) {
         CompletableFuture<O> future = new CompletableFuture<>();
-        THREAD_POOL.submit(new RPCTask<I, O>(request, future));
+        if (pool == null) {
+            log.info("Attempted to use closed client, connecting now. {}", request);
+            connect();
+        }
+        pool.submit(new RPCTask<I, O>(request, future));
         return future;
     }