Updates to ClusterMessagigProtocolClient's handling of remote node connectivity issues.
Change-Id: If3cd220bef339cc57b2a5d034c6e86bad2202a9f
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 040abed..9bba4b5 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -12,6 +12,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
@@ -24,8 +25,6 @@
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
-import org.onlab.onos.cluster.ClusterEvent;
-import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
@@ -47,17 +46,13 @@
private final ControllerNode localNode;
private final TcpMember remoteMember;
- // (remoteNode == null) => disconnected state
- private volatile ControllerNode remoteNode;
+ private ControllerNode remoteNode;
+ private final AtomicBoolean connectionOK = new AtomicBoolean(true);
// TODO: make this non-static and stop on close
private static final ExecutorService THREAD_POOL
= Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
- private volatile CompletableFuture<Void> appeared;
-
- private volatile InternalClusterEventListener listener;
-
public ClusterMessagingProtocolClient(
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
@@ -72,83 +67,34 @@
@Override
public CompletableFuture<PingResponse> ping(PingRequest request) {
- return connect().thenCompose((connected) -> { return requestReply(request); });
+ return requestReply(request);
}
@Override
public CompletableFuture<SyncResponse> sync(SyncRequest request) {
- return connect().thenCompose((connected) -> { return requestReply(request); });
+ return requestReply(request);
}
@Override
public CompletableFuture<PollResponse> poll(PollRequest request) {
- return connect().thenCompose((connected) -> { return requestReply(request); });
+ return requestReply(request);
}
@Override
public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
- return connect().thenCompose((connected) -> { return requestReply(request); });
+ return requestReply(request);
}
@Override
public synchronized CompletableFuture<Void> connect() {
- if (remoteNode != null) {
- // done
- return CompletableFuture.completedFuture(null);
- }
-
- if (appeared != null) {
- // already waiting for member to appear
- return appeared;
- }
-
- appeared = new CompletableFuture<>();
- listener = new InternalClusterEventListener();
- clusterService.addListener(listener);
-
- remoteNode = getControllerNode(remoteMember);
-
- if (remoteNode != null) {
- // done
- return CompletableFuture.completedFuture(null);
- }
-
- // wait for specified controller node to come up
- return appeared;
+ return CompletableFuture.completedFuture(null);
}
@Override
public synchronized CompletableFuture<Void> close() {
- if (listener != null) {
- clusterService.removeListener(listener);
- listener = null;
- }
- if (appeared != null) {
- appeared.cancel(true);
- appeared = null;
- }
return CompletableFuture.completedFuture(null);
}
- private synchronized void checkIfMemberAppeared() {
- final ControllerNode controllerNode = getControllerNode(remoteMember);
- if (controllerNode == null) {
- // still not there: no-op
- return;
- }
-
- // found
- remoteNode = controllerNode;
- if (appeared != null) {
- appeared.complete(null);
- }
-
- if (listener != null) {
- clusterService.removeListener(listener);
- listener = null;
- }
- }
-
private <I> MessageSubject messageType(I input) {
Class<?> clazz = input.getClass();
if (clazz.equals(PollRequest.class)) {
@@ -162,7 +108,6 @@
} else {
throw new IllegalArgumentException("Unknown class " + clazz.getName());
}
-
}
private <I, O> CompletableFuture<O> requestReply(I request) {
@@ -182,18 +127,6 @@
return null;
}
- private final class InternalClusterEventListener
- implements ClusterEventListener {
-
- public InternalClusterEventListener() {
- }
-
- @Override
- public void event(ClusterEvent event) {
- checkIfMemberAppeared();
- }
- }
-
private class RPCTask<I, O> implements Runnable {
private final I request;
@@ -213,22 +146,25 @@
@Override
public void run() {
try {
- ControllerNode node = remoteNode;
- if (node == null) {
- throw new IOException("Remote node disappeared");
+ if (remoteNode == null) {
+ remoteNode = getControllerNode(remoteMember);
+ if (remoteNode == null) {
+ throw new IOException("Remote node is offline!");
+ }
}
byte[] response = clusterCommunicator
- .sendAndReceive(message, node.id())
+ .sendAndReceive(message, remoteNode.id())
.get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ if (!connectionOK.getAndSet(true)) {
+ log.info("Connectivity to {} restored", remoteNode);
+ }
future.complete(verifyNotNull(SERIALIZER.decode(response)));
-
} catch (IOException | TimeoutException e) {
- log.warn("RPCTask for {} failed: {}", request, e.getMessage());
+ if (connectionOK.getAndSet(false)) {
+ log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
+ }
log.debug("RPCTask for {} failed.", request, e);
future.completeExceptionally(e);
- // Treating this client as disconnected
- remoteNode = null;
- appeared = null;
} catch (ExecutionException e) {
log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
log.debug("RPCTask execution for {} failed.", request, e);