Added support for responding directly via a cluster message received from a peer
diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index b74f887..dd29f24 100644
--- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -1,5 +1,7 @@
 package org.onlab.onos.store.cluster.messaging;
 
+import java.io.IOException;
+
 import org.onlab.onos.cluster.NodeId;
 
 // TODO: Should payload type be ByteBuffer?
@@ -49,4 +51,14 @@
     public byte[] payload() {
         return payload;
     }
+
+    /**
+     * Sends a response to the sender.
+     *
+     * @param data payload response.
+     * @throws IOException
+     */
+    public void respond(byte[] data) throws IOException {
+        throw new IllegalStateException("One can only repond to message recived from others.");
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 710d750..44159a7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -158,7 +158,7 @@
         public void handle(Message message) {
             try {
                 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
-                handler.handle(clusterMessage);
+                handler.handle(new InternalClusterMessage(clusterMessage, message));
             } catch (Exception e) {
                 log.error("Exception caught during ClusterMessageHandler", e);
                 throw e;
@@ -166,6 +166,21 @@
         }
     }
 
+    public static final class InternalClusterMessage extends ClusterMessage {
+
+        private final Message rawMessage;
+
+        public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
+            super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
+            this.rawMessage = rawMessage;
+        }
+
+        @Override
+        public void respond(byte[] response) throws IOException {
+            rawMessage.respond(response);
+        }
+    }
+
     private static final class InternalClusterMessageResponse implements ClusterMessageResponse {
 
         private final NodeId sender;