ClusterMessagingProtocolClient: changed thread pool

Change-Id: Ibb37bd2c7c94067336152f19412523dc4cda9722
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index f8c1be8..070ae8c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -3,13 +3,13 @@
 import static com.google.common.base.Verify.verifyNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
+import static org.onlab.util.Tools.namedThreads;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -33,8 +33,6 @@
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.slf4j.Logger;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
  * ONOS Cluster messaging based Copycat protocol client.
  */
@@ -42,9 +40,6 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final ThreadFactory THREAD_FACTORY =
-            new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
-
     public static final long RETRY_INTERVAL_MILLIS = 2000;
 
     private final ClusterService clusterService;
@@ -53,9 +48,9 @@
     private final TcpMember remoteMember;
     private ControllerNode remoteNode;
 
-    // FIXME: Thread pool sizing.
-    private static final ScheduledExecutorService THREAD_POOL =
-            new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
+    // TODO: make this non-static and stop on close
+    private static final ExecutorService THREAD_POOL
+        = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
 
     private volatile CompletableFuture<Void> appeared;
 
@@ -173,7 +168,7 @@
 
     private <I, O> CompletableFuture<O> requestReply(I request) {
         CompletableFuture<O> future = new CompletableFuture<>();
-        THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
+        THREAD_POOL.submit(new RPCTask<I, O>(request, future));
         return future;
     }
 
@@ -198,7 +193,6 @@
         public void event(ClusterEvent event) {
             checkIfMemberAppeared();
         }
-
     }
 
     private class RPCTask<I, O> implements Runnable {
@@ -225,9 +219,13 @@
                     .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
                 future.complete(verifyNotNull(SERIALIZER.decode(response)));
 
-            } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
+            } catch (IOException | ExecutionException | TimeoutException e) {
                 log.warn("RPCTask for {} failed.", request, e);
                 future.completeExceptionally(e);
+            } catch (InterruptedException e) {
+                log.warn("RPCTask for {} was interrupted.", request, e);
+                future.completeExceptionally(e);
+                Thread.currentThread().interrupt();
             } catch (Exception e) {
                 log.warn("RPCTask for {} terribly failed.", request, e);
                 future.completeExceptionally(e);