Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
index 8b966ed..6fc150c 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -37,6 +37,15 @@
     boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException;
 
     /**
+     * Sends a message synchronously.
+     * @param message message to send
+     * @param toNodeId recipient node identifier
+     * @return ClusterMessageResponse which is reply future.
+     * @throws IOException
+     */
+    ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
+
+    /**
      * Adds a new subscriber for the specified message subject.
      *
      * @param subject    message subject
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
new file mode 100644
index 0000000..ae2089d
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java
@@ -0,0 +1,12 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.onlab.onos.cluster.NodeId;
+
+public interface ClusterMessageResponse {
+    public NodeId sender();
+    public byte[] get(long timeout, TimeUnit timeunit) throws TimeoutException;
+    public byte[] get(long timeout) throws InterruptedException;
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 30465ac..710d750 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,6 +4,9 @@
 
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -17,6 +20,7 @@
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.serializers.ClusterMessageSerializer;
 import org.onlab.onos.store.serializers.KryoPoolUtil;
@@ -28,6 +32,7 @@
 import org.onlab.netty.MessageHandler;
 import org.onlab.netty.MessagingService;
 import org.onlab.netty.NettyMessagingService;
+import org.onlab.netty.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,6 +125,22 @@
     }
 
     @Override
+    public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+        ControllerNode node = clusterService.getNode(toNodeId);
+        checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+        Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
+        try {
+            Response responseFuture =
+                    messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
+            return new InternalClusterMessageResponse(toNodeId, responseFuture);
+
+        } catch (IOException e) {
+            log.error("Failed interaction with remote nodeId: " + toNodeId, e);
+            throw e;
+        }
+    }
+
+    @Override
     public void addSubscriber(MessageSubject subject,
             ClusterMessageHandler subscriber) {
         messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
@@ -144,4 +165,30 @@
             }
         }
     }
+
+    private static final class InternalClusterMessageResponse implements ClusterMessageResponse {
+
+        private final NodeId sender;
+        private final Response responseFuture;
+
+        public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
+            this.sender = sender;
+            this.responseFuture = responseFuture;
+        }
+        @Override
+        public NodeId sender() {
+            return sender;
+        }
+
+        @Override
+        public byte[] get(long timeout, TimeUnit timeunit)
+                throws TimeoutException {
+            return responseFuture.get(timeout, timeunit);
+        }
+
+        @Override
+        public byte[] get(long timeout) throws InterruptedException {
+            return responseFuture.get();
+        }
+    }
 }