ClusterMessageResponse to provide Future interface
Change-Id: I6d43382a1b572f34c5d7d1d41ca1e41dd472f6f2
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
index ae2089d..d2a0039 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
@@ -1,12 +1,18 @@
package org.onlab.onos.store.cluster.messaging;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onlab.onos.cluster.NodeId;
-public interface ClusterMessageResponse {
+public interface ClusterMessageResponse extends Future<byte[]> {
+
public NodeId sender();
- public byte[] get(long timeout, TimeUnit timeunit) throws TimeoutException;
- public byte[] get(long timeout) throws InterruptedException;
+
+ // TODO InterruptedException, ExecutionException removed from original
+ // Future declaration. Revisit if we ever need those.
+ @Override
+ public byte[] get(long timeout, TimeUnit unit) throws TimeoutException;
+
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 42d89de..4db23ce 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,9 +4,9 @@
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -181,10 +181,13 @@
}
}
- private static final class InternalClusterMessageResponse implements ClusterMessageResponse {
+ private static final class InternalClusterMessageResponse
+ implements ClusterMessageResponse {
private final NodeId sender;
private final Response responseFuture;
+ private volatile boolean isCancelled = false;
+ private volatile boolean isDone = false;
public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
this.sender = sender;
@@ -198,12 +201,39 @@
@Override
public byte[] get(long timeout, TimeUnit timeunit)
throws TimeoutException {
- return responseFuture.get(timeout, timeunit);
+ final byte[] result = responseFuture.get(timeout, timeunit);
+ isDone = true;
+ return result;
}
@Override
- public byte[] get(long timeout) throws InterruptedException {
- return responseFuture.get();
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (isDone()) {
+ return false;
+ }
+ // doing nothing for now
+ // when onlab.netty Response support cancel, call them.
+ isCancelled = true;
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return isCancelled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return this.isDone || isCancelled();
+ }
+
+ @Override
+ public byte[] get() throws InterruptedException, ExecutionException {
+ // TODO: consider forbidding this call and force the use of timed get
+ // to enforce handling of remote peer failure scenario
+ final byte[] result = responseFuture.get();
+ isDone = true;
+ return result;
}
}
}