Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 5c1fc33..7e7ec06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -17,7 +17,6 @@
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -54,13 +53,14 @@
 import org.slf4j.Logger;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+
 import static com.google.common.io.ByteStreams.toByteArray;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.onlab.util.Tools.groupedThreads;
@@ -351,22 +351,34 @@
      */
     private void fetchBits(Application app) {
         ControllerNode localNode = clusterService.getLocalNode();
-        ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
-                                                    app.id().name().getBytes(Charsets.UTF_8));
-        //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
         CountDownLatch latch = new CountDownLatch(1);
 
         // FIXME: send message with name & version to make sure we don't get served old bits
 
         log.info("Downloading bits for application {}", app.id().name());
         for (ControllerNode node : clusterService.getNodes()) {
-            try {
-                ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
-                future.addListener(new InternalBitListener(app, node, future, latch), executor);
-            } catch (IOException e) {
-                log.debug("Unable to request bits for application {} from node {}",
-                          app.id().name(), node.id());
+            if (latch.getCount() == 0) {
+                break;
             }
+            if (node.equals(localNode)) {
+                continue;
+            }
+            clusterCommunicator.sendAndReceive(app.id().name(),
+                                    APP_BITS_REQUEST,
+                                    s -> s.getBytes(Charsets.UTF_8),
+                                    Function.identity(),
+                                    node.id())
+                               .whenCompleteAsync((bits, error) -> {
+                                   if (error == null && latch.getCount() > 0) {
+                                       saveApplication(new ByteArrayInputStream(bits));
+                                       log.info("Downloaded bits for application {} from node {}",
+                                               app.id().name(), node.id());
+                                       latch.countDown();
+                                   } else if (error != null) {
+                                       log.warn("Unable to fetch bits for application {} from node {}",
+                                               app.id().name(), node.id(), error);
+                                   }
+                               }, executor);
         }
 
         try {
@@ -392,41 +404,6 @@
             }
         }
     }
-
-    /**
-     * Processes completed fetch requests.
-     */
-    private class InternalBitListener implements Runnable {
-        private final Application app;
-        private final ControllerNode node;
-        private final ListenableFuture<byte[]> future;
-        private final CountDownLatch latch;
-
-        public InternalBitListener(Application app, ControllerNode node,
-                                   ListenableFuture<byte[]> future, CountDownLatch latch) {
-            this.app = app;
-            this.node = node;
-            this.future = future;
-            this.latch = latch;
-        }
-
-        @Override
-        public void run() {
-            if (latch.getCount() > 0 && !future.isCancelled()) {
-                try {
-                    byte[] bits = future.get(1, MILLISECONDS);
-                    saveApplication(new ByteArrayInputStream(bits));
-                    log.info("Downloaded bits for application {} from node {}",
-                             app.id().name(), node.id());
-                    latch.countDown();
-                } catch (Exception e) {
-                    log.warn("Unable to fetch bits for application {} from node {}",
-                             app.id().name(), node.id());
-                }
-            }
-        }
-    }
-
     /**
      * Prunes applications which are not in the map, but are on disk.
      */
@@ -449,6 +426,4 @@
                                       appDesc.origin(), appDesc.permissions(),
                                       appDesc.featuresRepo(), appDesc.features());
     }
-
 }
-
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 2f6a149..10bf6a4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -419,10 +419,9 @@
                             // Dispatch to all instances
 
                             clusterCommunicator.broadcastIncludeSelf(
-                                    new ClusterMessage(
-                                            clusterService.getLocalNode().id(),
-                                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                                            SERIALIZER.encode(leadershipEvent)));
+                                    leadershipEvent,
+                                    LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                    SERIALIZER::encode);
                         } else {
                             //
                             // Test if time to expire a stale leader
@@ -491,11 +490,11 @@
                         leadershipEvent = new LeadershipEvent(
                                              LeadershipEvent.Type.LEADER_ELECTED,
                                              new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
+
                         clusterCommunicator.broadcastIncludeSelf(
-                                new ClusterMessage(
-                                        clusterService.getLocalNode().id(),
-                                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                                        SERIALIZER.encode(leadershipEvent)));
+                                leadershipEvent,
+                                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                SERIALIZER::encode);
                     }
 
                     // Sleep forever until interrupted
@@ -519,11 +518,12 @@
                         leadershipEvent = new LeadershipEvent(
                                                  LeadershipEvent.Type.LEADER_BOOTED,
                                                  new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
+
                         clusterCommunicator.broadcastIncludeSelf(
-                                new ClusterMessage(
-                                        clusterService.getLocalNode().id(),
-                                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                                        SERIALIZER.encode(leadershipEvent)));
+                                leadershipEvent,
+                                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                SERIALIZER::encode);
+
                         if (leaderLock.isLockedByCurrentThread()) {
                             leaderLock.unlock();
                         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 42a79b3..204e0fd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.cluster.messaging.impl;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -37,8 +36,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
 import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -122,7 +130,84 @@
         return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
     }
 
-    private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
+    @Override
+    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
+        SettableFuture<byte[]> response = SettableFuture.create();
+        sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
+            if (e == null) {
+                response.set(r);
+            } else {
+                response.setException(e);
+            }
+        });
+        return response;
+    }
+
+    @Override
+    public <M> void broadcast(M message,
+                              MessageSubject subject,
+                              Function<M, byte[]> encoder) {
+        multicast(message,
+                  subject,
+                  encoder,
+                  clusterService.getNodes()
+                      .stream()
+                      .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+                      .map(ControllerNode::id)
+                      .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public <M> void broadcastIncludeSelf(M message,
+                                         MessageSubject subject,
+                                         Function<M, byte[]> encoder) {
+        multicast(message,
+                  subject,
+                  encoder,
+                  clusterService.getNodes()
+                      .stream()
+                      .map(ControllerNode::id)
+                      .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public <M> boolean unicast(M message,
+                               MessageSubject subject,
+                               Function<M, byte[]> encoder,
+                               NodeId toNodeId) {
+        byte[] payload = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                encoder.apply(message)).getBytes();
+        return unicastUnchecked(subject, payload, toNodeId);
+    }
+
+    @Override
+    public <M> void multicast(M message,
+                              MessageSubject subject,
+                              Function<M, byte[]> encoder,
+                              Set<NodeId> nodes) {
+        byte[] payload = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                encoder.apply(message)).getBytes();
+        nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+    }
+
+    @Override
+    public <M, R> CompletableFuture<R> sendAndReceive(M message,
+                                                      MessageSubject subject,
+                                                      Function<M, byte[]> encoder,
+                                                      Function<byte[], R> decoder,
+                                                      NodeId toNodeId) {
+        ClusterMessage envelope = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                encoder.apply(message));
+        return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+    }
+
+    private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
@@ -131,37 +216,15 @@
             return true;
         } catch (IOException e) {
             log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
-            throw e;
-        }
-    }
-
-    private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
-        try {
-            return unicast(subject, payload, toNodeId);
-        } catch (IOException e) {
             return false;
         }
     }
 
-    @Override
-    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+    private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
-        try {
-            return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
-
-        } catch (IOException e) {
-            log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
-            throw e;
-        }
-    }
-
-    @Override
-    @Deprecated
-    public void addSubscriber(MessageSubject subject,
-                              ClusterMessageHandler subscriber) {
-        messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
+        return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
     }
 
     @Override
@@ -202,6 +265,60 @@
         }
     }
 
+    @Override
+    public <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, R> handler,
+            Function<R, byte[]> encoder,
+            ExecutorService executor) {
+        messagingService.registerHandler(subject.value(),
+                new InternalMessageResponder<>(decoder, encoder, handler),
+                executor);
+    }
+
+    @Override
+    public <M> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Consumer<M> handler,
+            ExecutorService executor) {
+        messagingService.registerHandler(subject.value(),
+                new InternalMessageConsumer<>(decoder, handler),
+                executor);
+    }
+
+    private class InternalMessageResponder<M, R> implements MessageHandler {
+        private final Function<byte[], M> decoder;
+        private final Function<R, byte[]> encoder;
+        private final Function<M, R> handler;
+
+        public InternalMessageResponder(Function<byte[], M> decoder,
+                                        Function<R, byte[]> encoder,
+                                        Function<M, R> handler) {
+            this.decoder = decoder;
+            this.encoder = encoder;
+            this.handler = handler;
+        }
+        @Override
+        public void handle(Message message) throws IOException {
+            R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+            message.respond(encoder.apply(response));
+        }
+    }
+
+    private class InternalMessageConsumer<M> implements MessageHandler {
+        private final Function<byte[], M> decoder;
+        private final Consumer<M> consumer;
+
+        public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+            this.decoder = decoder;
+            this.consumer = consumer;
+        }
+        @Override
+        public void handle(Message message) throws IOException {
+            consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+        }
+    }
+
     public static final class InternalClusterMessage extends ClusterMessage {
 
         private final Message rawMessage;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 8bebff7..cf3700b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -343,11 +343,9 @@
 
     private void notifyPeers(LeadershipEvent event) {
         eventDispatcher.post(event);
-        clusterCommunicator.broadcast(
-                new ClusterMessage(
-                        clusterService.getLocalNode().id(),
-                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                        SERIALIZER.encode(event)));
+        clusterCommunicator.broadcast(event,
+                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                SERIALIZER::encode);
     }
 
     private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
@@ -366,11 +364,9 @@
         if (updatedLeader) {
             LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
             eventDispatcher.post(event);
-            clusterCommunicator.broadcast(
-                    new ClusterMessage(
-                            clusterService.getLocalNode().id(),
-                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                            SERIALIZER.encode(event)));
+            clusterCommunicator.broadcast(event,
+                    LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                    SERIALIZER::encode);
         }
     }
 
@@ -469,11 +465,9 @@
             leaderBoard.forEach((path, leadership) -> {
                 if (leadership.leader().equals(localNodeId)) {
                     LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
-                    clusterCommunicator.broadcast(
-                            new ClusterMessage(
-                                    clusterService.getLocalNode().id(),
-                                    LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                                    SERIALIZER.encode(event)));
+                    clusterCommunicator.broadcast(event,
+                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                            SERIALIZER::encode);
                 }
             });
         } catch (Exception e) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 131000b..b47376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -304,11 +304,9 @@
 
             DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
                     providerId, deviceId, deviceDescription);
-            ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
-                    SERIALIZER.encode(deviceInjectedEvent));
 
             // TODO check unicast return value
-            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
             /* error log:
             log.warn("Failed to process injected device id: {} desc: {} " +
                             "(cluster messaging failed: {})",
@@ -555,11 +553,9 @@
             }
 
             PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
-            ClusterMessage clusterMessage = new ClusterMessage(
-                    localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
 
             //TODO check unicast return value
-            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
             /* error log:
             log.warn("Failed to process injected ports of device id: {} " +
                             "(cluster messaging failed: {})",
@@ -867,13 +863,8 @@
             log.debug("{} has control of {}, forwarding remove request",
                      master, deviceId);
 
-             ClusterMessage message = new ClusterMessage(
-                     myId,
-                     DEVICE_REMOVE_REQ,
-                     SERIALIZER.encode(deviceId));
-
             // TODO check unicast return value
-            clusterCommunicator.unicast(message, master);
+            clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
              /* error log:
              log.error("Failed to forward {} remove request to {}", deviceId, master, e);
              */
@@ -1057,19 +1048,11 @@
     }
 
     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.unicast(message, recipient);
+        clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
     }
 
     private void notifyPeers(InternalDeviceEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 33e4251..c46131e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -510,11 +510,7 @@
     }
 
     private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                serializer.encode(event));
-        return clusterCommunicator.unicast(message, peer);
+        return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
         // Note: we had this flipped before...
 //        communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 781e24b..f5807e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
 import com.hazelcast.core.IMap;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -35,6 +36,7 @@
 import org.onlab.util.BoundedThreadPool;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.NewConcurrentHashMap;
+import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
@@ -93,7 +95,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -360,22 +361,16 @@
         log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), rule.deviceId());
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.GET_FLOW_ENTRY,
-                SERIALIZER.encode(rule));
-
-        try {
-            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-            log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
-        }
-        return null;
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
+                                    FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+                                    SERIALIZER::encode,
+                                    SERIALIZER::decode,
+                                    replicaInfo.master().get()),
+                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                               TimeUnit.MILLISECONDS,
+                               null);
     }
 
-
-
     @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
 
@@ -393,22 +388,16 @@
         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GET_DEVICE_FLOW_ENTRIES,
-                SERIALIZER.encode(deviceId));
-
-        try {
-            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-            log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
-        }
-        return Collections.emptyList();
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
+                                    FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+                                    SERIALIZER::encode,
+                                    SERIALIZER::decode,
+                                    replicaInfo.master().get()),
+                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                               TimeUnit.MILLISECONDS,
+                               Collections.emptyList());
     }
 
-
-
     @Override
     public void storeFlowRule(FlowRule rule) {
         storeBatch(new FlowRuleBatchOperation(
@@ -453,14 +442,10 @@
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(
-                local,
-                APPLY_BATCH_FLOWS,
-                SERIALIZER.encode(operation));
-
-
-        if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
-            log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
+        if (!clusterCommunicator.unicast(operation,
+                APPLY_BATCH_FLOWS, SERIALIZER::encode,
+                replicaInfo.master().get())) {
+            log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
 
             Set<FlowRule> allFailures = operation.getOperations().stream()
                     .map(op -> op.target())
@@ -612,18 +597,15 @@
         log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                REMOVE_FLOW_ENTRY,
-                SERIALIZER.encode(rule));
-
-        try {
-            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-            // TODO: Retry against latest master or throw a FlowStoreException
-            throw new RuntimeException(e);
-        }
+        return Futures.get(clusterCommunicator.sendAndReceive(
+                               rule,
+                               REMOVE_FLOW_ENTRY,
+                               SERIALIZER::encode,
+                               SERIALIZER::decode,
+                               replicaInfo.master().get()),
+                           FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                           TimeUnit.MILLISECONDS,
+                           RuntimeException.class);
     }
 
     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -649,12 +631,8 @@
         if (nodeId == null) {
             notifyDelegate(event);
         } else {
-            ClusterMessage message = new ClusterMessage(
-                    clusterService.getLocalNode().id(),
-                    REMOTE_APPLY_COMPLETED,
-                    SERIALIZER.encode(event));
             // TODO check unicast return value
-            clusterCommunicator.unicast(message, nodeId);
+            clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
             //error log: log.warn("Failed to respond to peer for batch operation result");
         }
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 708faf6..b609a3c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -20,6 +20,7 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -45,7 +46,6 @@
 import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.serializers.DecodeTo;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.StoreSerializer;
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
@@ -199,18 +199,12 @@
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(clusterService
-                    .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
-
-        try {
-            ListenableFuture<byte[]> responseFuture = clusterCommunicator
-                    .sendAndReceive(message, replicaInfo.master().get());
-            // here should add another decode process
-            return Futures.transform(responseFuture,
-                                     new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
-        } catch (IOException e) {
-            return Futures.immediateFailedFuture(e);
-        }
+        return clusterCommunicator.sendAndReceive(
+                batchOperation,
+                APPLY_EXTEND_FLOWS,
+                SERIALIZER::encode,
+                SERIALIZER::decode,
+                replicaInfo.master().get());
     }
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 51b4111..ae1669b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -382,17 +382,13 @@
             GroupStoreMessage groupOp = GroupStoreMessage.
                     createGroupAddRequestMsg(groupDesc.deviceId(),
                                              groupDesc);
-            ClusterMessage message = new ClusterMessage(
-                                    clusterService.getLocalNode().id(),
-                                    GroupStoreMessageSubjects.
-                                    REMOTE_GROUP_OP_REQUEST,
-                                    kryoBuilder.build().serialize(groupOp));
-            if (!clusterCommunicator.unicast(message,
-                                             mastershipService.
-                                             getMasterFor(
-                                                groupDesc.deviceId()))) {
+
+            if (!clusterCommunicator.unicast(groupOp,
+                    GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+                    m -> kryoBuilder.build().serialize(m),
+                    mastershipService.getMasterFor(groupDesc.deviceId()))) {
                 log.warn("Failed to send request to master: {} to {}",
-                         message,
+                         groupOp,
                          mastershipService.getMasterFor(groupDesc.deviceId()));
                 //TODO: Send Group operation failure event
             }
@@ -472,16 +468,13 @@
                                                 type,
                                                 newBuckets,
                                                 newAppCookie);
-            ClusterMessage message =
-                    new ClusterMessage(clusterService.getLocalNode().id(),
-                                       GroupStoreMessageSubjects.
-                                              REMOTE_GROUP_OP_REQUEST,
-                                       kryoBuilder.build().serialize(groupOp));
-            if (!clusterCommunicator.unicast(message,
-                                             mastershipService.
-                                             getMasterFor(deviceId))) {
+
+            if (!clusterCommunicator.unicast(groupOp,
+                        GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+                        m -> kryoBuilder.build().serialize(m),
+                        mastershipService.getMasterFor(deviceId))) {
                 log.warn("Failed to send request to master: {} to {}",
-                         message,
+                         groupOp,
                          mastershipService.getMasterFor(deviceId));
                 //TODO: Send Group operation failure event
             }
@@ -584,16 +577,13 @@
             GroupStoreMessage groupOp = GroupStoreMessage.
                     createGroupDeleteRequestMsg(deviceId,
                                                 appCookie);
-            ClusterMessage message =
-                    new ClusterMessage(clusterService.getLocalNode().id(),
-                                       GroupStoreMessageSubjects.
-                                              REMOTE_GROUP_OP_REQUEST,
-                                       kryoBuilder.build().serialize(groupOp));
-            if (!clusterCommunicator.unicast(message,
-                                             mastershipService.
-                                             getMasterFor(deviceId))) {
+
+            if (!clusterCommunicator.unicast(groupOp,
+                    GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+                    m -> kryoBuilder.build().serialize(m),
+                    mastershipService.getMasterFor(deviceId))) {
                 log.warn("Failed to send request to master: {} to {}",
-                         message,
+                         groupOp,
                          mastershipService.getMasterFor(deviceId));
                 //TODO: Send Group operation failure event
             }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
index 74236c3..1e6f8aa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
@@ -477,21 +477,13 @@
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
     }
 
     private void unicastMessage(NodeId peer,
                                 MessageSubject subject,
                                 Object event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.unicast(message, peer);
+        clusterCommunicator.unicast(event, subject, SERIALIZER::encode, peer);
     }
 
     private void notifyDelegateIfNotNull(HostEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 48c430b..5e09d05 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -334,17 +334,12 @@
 
 
             LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
-            ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
-                    GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
 
             // TODO check unicast return value
-            clusterCommunicator.unicast(linkInjectedMessage, dstNode);
-            /* error log:
-            log.warn("Failed to process link update between src: {} and dst: {} " +
-                            "(cluster messaging failed: {})",
-                    linkDescription.src(), linkDescription.dst(), e);
-            */
-
+            clusterCommunicator.unicast(linkInjectedEvent,
+                    GossipLinkStoreMessageSubjects.LINK_INJECTED,
+                    SERIALIZER::encode,
+                    dstNode);
         }
 
         return linkEvent;
@@ -653,19 +648,11 @@
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
     }
 
     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.unicast(message, recipient);
+        clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
     }
 
     private void notifyPeers(InternalLinkEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 25870b7..8a54502 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -181,20 +181,14 @@
             } else {
                 return MastershipRole.NONE;
             }
-        } else {
-            try {
-                MastershipRole role = complete(clusterCommunicator.sendAndReceive(
-                                                        new ClusterMessage(
-                                                                    localNodeId,
-                                                                    ROLE_QUERY_SUBJECT,
-                                                                    SERIALIZER.encode(deviceId)),
-                                                        nodeId));
-                return role == null ? MastershipRole.NONE : role;
-            } catch (IOException e) {
-                log.warn("Failed to query {} for {}'s role. Defaulting to NONE", nodeId, deviceId, e);
-                return MastershipRole.NONE;
-            }
         }
+        MastershipRole role = complete(clusterCommunicator.sendAndReceive(
+                deviceId,
+                ROLE_QUERY_SUBJECT,
+                SERIALIZER::encode,
+                SERIALIZER::decode,
+                nodeId));
+        return role == null ? MastershipRole.NONE : role;
     }
 
     @Override
@@ -276,17 +270,12 @@
         if (!nodeId.equals(localNodeId)) {
             log.debug("Forwarding request to relinquish "
                     + "role for device {} to {}", deviceId, nodeId);
-            try {
-                return complete(clusterCommunicator.sendAndReceive(
-                                                        new ClusterMessage(
-                                                                    localNodeId,
-                                                                    ROLE_RELINQUISH_SUBJECT,
-                                                                    SERIALIZER.encode(deviceId)),
-                                                        nodeId));
-            } catch (IOException e) {
-                log.warn("Failed to send a request to relinquish role for {} to {}", deviceId, nodeId, e);
-                return null;
-            }
+            return complete(clusterCommunicator.sendAndReceive(
+                    deviceId,
+                    ROLE_RELINQUISH_SUBJECT,
+                    SERIALIZER::encode,
+                    SERIALIZER::decode,
+                    nodeId));
         }
 
         // Check if this node is can be managed by this node.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 7ee8712..8bf8ae6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -131,9 +131,7 @@
         }
 
         // TODO check unicast return value
-        communicationService.unicast(new ClusterMessage(myId, PACKET_OUT_SUBJECT,
-                                                        SERIALIZER.encode(packet)),
-                                     master);
+        communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
         // error log: log.warn("Failed to send packet-out to {}", master);
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index f07c0e8..e314789 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.statistic.impl;
 
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,7 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
@@ -47,12 +49,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.onlab.util.Tools.groupedThreads;
@@ -218,20 +217,15 @@
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getCurrentStatisticInternal(connectPoint);
         } else {
-            ClusterMessage message = new ClusterMessage(
-                    clusterService.getLocalNode().id(),
-                    GET_CURRENT,
-                    SERIALIZER.encode(connectPoint));
-
-            try {
-                Future<byte[]> response =
-                        clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-                return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
-                                                      TimeUnit.MILLISECONDS));
-            } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-                log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
-                return Collections.emptySet();
-            }
+            return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+                                        connectPoint,
+                                        GET_CURRENT,
+                                        SERIALIZER::encode,
+                                        SERIALIZER::decode,
+                                        replicaInfo.master().get()),
+                                   STATISTIC_STORE_TIMEOUT_MILLIS,
+                                   TimeUnit.MILLISECONDS,
+                                   Collections.emptySet());
         }
 
     }
@@ -251,22 +245,16 @@
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getPreviousStatisticInternal(connectPoint);
         } else {
-            ClusterMessage message = new ClusterMessage(
-                    clusterService.getLocalNode().id(),
-                    GET_PREVIOUS,
-                    SERIALIZER.encode(connectPoint));
-
-            try {
-                Future<byte[]> response =
-                        clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-                return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
-                                                      TimeUnit.MILLISECONDS));
-            } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-                log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
-                return Collections.emptySet();
-            }
+            return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+                                        connectPoint,
+                                        GET_PREVIOUS,
+                                        SERIALIZER::encode,
+                                        SERIALIZER::decode,
+                                        replicaInfo.master().get()),
+                                   STATISTIC_STORE_TIMEOUT_MILLIS,
+                                   TimeUnit.MILLISECONDS,
+                                   Collections.emptySet());
         }
-
     }
 
     private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {