ClusterMessagingProtocolClient: transition to not connected state on IO error

Change-Id: Iac0af5b5a55868d2677aecf18e63e00018d5113f
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 54d8e2b..1f5a20d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -12,7 +12,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
 import net.kuujo.copycat.cluster.TcpMember;
 import net.kuujo.copycat.protocol.PingRequest;
 import net.kuujo.copycat.protocol.PingResponse;
@@ -46,7 +45,9 @@
     private final ClusterCommunicationService clusterCommunicator;
     private final ControllerNode localNode;
     private final TcpMember remoteMember;
-    private ControllerNode remoteNode;
+
+    // (remoteNode == null) => disconnected state
+    private volatile ControllerNode remoteNode;
 
     // TODO: make this non-static and stop on close
     private static final ExecutorService THREAD_POOL
@@ -70,22 +71,22 @@
 
     @Override
     public CompletableFuture<PingResponse> ping(PingRequest request) {
-        return requestReply(request);
+        return connect().thenCompose((connected) -> { return requestReply(request); });
     }
 
     @Override
     public CompletableFuture<SyncResponse> sync(SyncRequest request) {
-        return requestReply(request);
+        return connect().thenCompose((connected) -> { return requestReply(request); });
     }
 
     @Override
     public CompletableFuture<PollResponse> poll(PollRequest request) {
-        return requestReply(request);
+        return connect().thenCompose((connected) -> { return requestReply(request); });
     }
 
     @Override
     public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
-        return requestReply(request);
+        return connect().thenCompose((connected) -> { return requestReply(request); });
     }
 
     @Override
@@ -95,13 +96,6 @@
             return CompletableFuture.completedFuture(null);
         }
 
-        remoteNode = getControllerNode(remoteMember);
-
-        if (remoteNode != null) {
-            // done
-            return CompletableFuture.completedFuture(null);
-        }
-
         if (appeared != null) {
             // already waiting for member to appear
             return appeared;
@@ -111,6 +105,13 @@
         listener = new InternalClusterEventListener();
         clusterService.addListener(listener);
 
+        remoteNode = getControllerNode(remoteMember);
+
+        if (remoteNode != null) {
+            // done
+            return CompletableFuture.completedFuture(null);
+        }
+
         // wait for specified controller node to come up
         return appeared;
     }
@@ -211,15 +212,25 @@
         @Override
         public void run() {
             try {
+                ControllerNode node = remoteNode;
+                if (node == null) {
+                    throw new IOException("Remote node disappeared");
+                }
                 byte[] response = clusterCommunicator
-                    .sendAndReceive(message, remoteNode.id())
+                    .sendAndReceive(message, node.id())
                     .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
                 future.complete(verifyNotNull(SERIALIZER.decode(response)));
 
-            } catch (IOException | ExecutionException | TimeoutException e) {
+            } catch (IOException | TimeoutException e) {
                 log.warn("RPCTask for {} failed: {}", request, e.getMessage());
                 log.debug("RPCTask for {} failed.", request, e);
                 future.completeExceptionally(e);
+                // Treating this client as disconnected
+                remoteNode = null;
+            } catch (ExecutionException e) {
+                log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
+                log.debug("RPCTask execution for {} failed.", request, e);
+                future.completeExceptionally(e);
             } catch (InterruptedException e) {
                 log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
                 log.debug("RPCTask for {} was interrupted.", request, e);