Updates to ClusterMessagigProtocolClient's handling of remote node connectivity issues.

Change-Id: If3cd220bef339cc57b2a5d034c6e86bad2202a9f
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 040abed..9bba4b5 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -12,6 +12,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import net.kuujo.copycat.cluster.TcpMember;
 import net.kuujo.copycat.protocol.PingRequest;
@@ -24,8 +25,6 @@
 import net.kuujo.copycat.protocol.SyncResponse;
 import net.kuujo.copycat.spi.protocol.ProtocolClient;
 
-import org.onlab.onos.cluster.ClusterEvent;
-import org.onlab.onos.cluster.ClusterEventListener;
 import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
@@ -47,17 +46,13 @@
     private final ControllerNode localNode;
     private final TcpMember remoteMember;
 
-    // (remoteNode == null) => disconnected state
-    private volatile ControllerNode remoteNode;
+    private ControllerNode remoteNode;
+    private final AtomicBoolean connectionOK = new AtomicBoolean(true);
 
     // TODO: make this non-static and stop on close
     private static final ExecutorService THREAD_POOL
         = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
 
-    private volatile CompletableFuture<Void> appeared;
-
-    private volatile InternalClusterEventListener listener;
-
     public ClusterMessagingProtocolClient(
             ClusterService clusterService,
             ClusterCommunicationService clusterCommunicator,
@@ -72,83 +67,34 @@
 
     @Override
     public CompletableFuture<PingResponse> ping(PingRequest request) {
-        return connect().thenCompose((connected) -> { return requestReply(request); });
+        return requestReply(request);
     }
 
     @Override
     public CompletableFuture<SyncResponse> sync(SyncRequest request) {
-        return connect().thenCompose((connected) -> { return requestReply(request); });
+        return requestReply(request);
     }
 
     @Override
     public CompletableFuture<PollResponse> poll(PollRequest request) {
-        return connect().thenCompose((connected) -> { return requestReply(request); });
+        return requestReply(request);
     }
 
     @Override
     public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
-        return connect().thenCompose((connected) -> { return requestReply(request); });
+        return requestReply(request);
     }
 
     @Override
     public synchronized CompletableFuture<Void> connect() {
-        if (remoteNode != null) {
-            // done
-            return CompletableFuture.completedFuture(null);
-        }
-
-        if (appeared != null) {
-            // already waiting for member to appear
-            return appeared;
-        }
-
-        appeared = new CompletableFuture<>();
-        listener = new InternalClusterEventListener();
-        clusterService.addListener(listener);
-
-        remoteNode = getControllerNode(remoteMember);
-
-        if (remoteNode != null) {
-            // done
-            return CompletableFuture.completedFuture(null);
-        }
-
-        // wait for specified controller node to come up
-        return appeared;
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public synchronized CompletableFuture<Void> close() {
-        if (listener != null) {
-            clusterService.removeListener(listener);
-            listener = null;
-        }
-        if (appeared != null) {
-            appeared.cancel(true);
-            appeared = null;
-        }
         return CompletableFuture.completedFuture(null);
     }
 
-    private synchronized void checkIfMemberAppeared() {
-        final ControllerNode controllerNode = getControllerNode(remoteMember);
-        if (controllerNode == null) {
-            // still not there: no-op
-            return;
-        }
-
-        // found
-        remoteNode = controllerNode;
-        if (appeared != null) {
-            appeared.complete(null);
-        }
-
-        if (listener != null) {
-            clusterService.removeListener(listener);
-            listener = null;
-        }
-    }
-
     private <I> MessageSubject messageType(I input) {
         Class<?> clazz = input.getClass();
         if (clazz.equals(PollRequest.class)) {
@@ -162,7 +108,6 @@
         } else {
             throw new IllegalArgumentException("Unknown class " + clazz.getName());
         }
-
     }
 
     private <I, O> CompletableFuture<O> requestReply(I request) {
@@ -182,18 +127,6 @@
         return null;
     }
 
-    private final class InternalClusterEventListener
-            implements ClusterEventListener {
-
-        public InternalClusterEventListener() {
-        }
-
-        @Override
-        public void event(ClusterEvent event) {
-            checkIfMemberAppeared();
-        }
-    }
-
     private class RPCTask<I, O> implements Runnable {
 
         private final I request;
@@ -213,22 +146,25 @@
         @Override
         public void run() {
             try {
-                ControllerNode node = remoteNode;
-                if (node == null) {
-                    throw new IOException("Remote node disappeared");
+                if (remoteNode == null) {
+                    remoteNode = getControllerNode(remoteMember);
+                    if (remoteNode == null) {
+                        throw new IOException("Remote node is offline!");
+                    }
                 }
                 byte[] response = clusterCommunicator
-                    .sendAndReceive(message, node.id())
+                    .sendAndReceive(message, remoteNode.id())
                     .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+                if (!connectionOK.getAndSet(true)) {
+                    log.info("Connectivity to {} restored", remoteNode);
+                }
                 future.complete(verifyNotNull(SERIALIZER.decode(response)));
-
             } catch (IOException | TimeoutException e) {
-                log.warn("RPCTask for {} failed: {}", request, e.getMessage());
+                if (connectionOK.getAndSet(false)) {
+                    log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
+                }
                 log.debug("RPCTask for {} failed.", request, e);
                 future.completeExceptionally(e);
-                // Treating this client as disconnected
-                remoteNode = null;
-                appeared = null;
             } catch (ExecutionException e) {
                 log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
                 log.debug("RPCTask execution for {} failed.", request, e);