ClusterMessagingProtocolClient: transition to not connected state on IO error
Change-Id: Iac0af5b5a55868d2677aecf18e63e00018d5113f
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 54d8e2b..1f5a20d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -12,7 +12,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
@@ -46,7 +45,9 @@
private final ClusterCommunicationService clusterCommunicator;
private final ControllerNode localNode;
private final TcpMember remoteMember;
- private ControllerNode remoteNode;
+
+ // (remoteNode == null) => disconnected state
+ private volatile ControllerNode remoteNode;
// TODO: make this non-static and stop on close
private static final ExecutorService THREAD_POOL
@@ -70,22 +71,22 @@
@Override
public CompletableFuture<PingResponse> ping(PingRequest request) {
- return requestReply(request);
+ return connect().thenCompose((connected) -> { return requestReply(request); });
}
@Override
public CompletableFuture<SyncResponse> sync(SyncRequest request) {
- return requestReply(request);
+ return connect().thenCompose((connected) -> { return requestReply(request); });
}
@Override
public CompletableFuture<PollResponse> poll(PollRequest request) {
- return requestReply(request);
+ return connect().thenCompose((connected) -> { return requestReply(request); });
}
@Override
public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
- return requestReply(request);
+ return connect().thenCompose((connected) -> { return requestReply(request); });
}
@Override
@@ -95,13 +96,6 @@
return CompletableFuture.completedFuture(null);
}
- remoteNode = getControllerNode(remoteMember);
-
- if (remoteNode != null) {
- // done
- return CompletableFuture.completedFuture(null);
- }
-
if (appeared != null) {
// already waiting for member to appear
return appeared;
@@ -111,6 +105,13 @@
listener = new InternalClusterEventListener();
clusterService.addListener(listener);
+ remoteNode = getControllerNode(remoteMember);
+
+ if (remoteNode != null) {
+ // done
+ return CompletableFuture.completedFuture(null);
+ }
+
// wait for specified controller node to come up
return appeared;
}
@@ -211,15 +212,25 @@
@Override
public void run() {
try {
+ ControllerNode node = remoteNode;
+ if (node == null) {
+ throw new IOException("Remote node disappeared");
+ }
byte[] response = clusterCommunicator
- .sendAndReceive(message, remoteNode.id())
+ .sendAndReceive(message, node.id())
.get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
future.complete(verifyNotNull(SERIALIZER.decode(response)));
- } catch (IOException | ExecutionException | TimeoutException e) {
+ } catch (IOException | TimeoutException e) {
log.warn("RPCTask for {} failed: {}", request, e.getMessage());
log.debug("RPCTask for {} failed.", request, e);
future.completeExceptionally(e);
+ // Treating this client as disconnected
+ remoteNode = null;
+ } catch (ExecutionException e) {
+ log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
+ log.debug("RPCTask execution for {} failed.", request, e);
+ future.completeExceptionally(e);
} catch (InterruptedException e) {
log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
log.debug("RPCTask for {} was interrupted.", request, e);