Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
index 8b966ed..6fc150c 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -37,6 +37,15 @@
boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException;
/**
+ * Sends a message synchronously.
+ * @param message message to send
+ * @param toNodeId recipient node identifier
+ * @return ClusterMessageResponse which is reply future.
+ * @throws IOException
+ */
+ ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
+
+ /**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
new file mode 100644
index 0000000..ae2089d
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
@@ -0,0 +1,12 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.onlab.onos.cluster.NodeId;
+
+public interface ClusterMessageResponse {
+ public NodeId sender();
+ public byte[] get(long timeout, TimeUnit timeunit) throws TimeoutException;
+ public byte[] get(long timeout) throws InterruptedException;
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 30465ac..710d750 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,6 +4,9 @@
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -17,6 +20,7 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoPoolUtil;
@@ -28,6 +32,7 @@
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
+import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,6 +125,22 @@
}
@Override
+ public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+ ControllerNode node = clusterService.getNode(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
+ try {
+ Response responseFuture =
+ messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
+ return new InternalClusterMessageResponse(toNodeId, responseFuture);
+
+ } catch (IOException e) {
+ log.error("Failed interaction with remote nodeId: " + toNodeId, e);
+ throw e;
+ }
+ }
+
+ @Override
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber) {
messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
@@ -144,4 +165,30 @@
}
}
}
+
+ private static final class InternalClusterMessageResponse implements ClusterMessageResponse {
+
+ private final NodeId sender;
+ private final Response responseFuture;
+
+ public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
+ this.sender = sender;
+ this.responseFuture = responseFuture;
+ }
+ @Override
+ public NodeId sender() {
+ return sender;
+ }
+
+ @Override
+ public byte[] get(long timeout, TimeUnit timeunit)
+ throws TimeoutException {
+ return responseFuture.get(timeout, timeunit);
+ }
+
+ @Override
+ public byte[] get(long timeout) throws InterruptedException {
+ return responseFuture.get();
+ }
+ }
}