Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
index a979b65..8c160e8 100644
--- a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
@@ -219,7 +219,7 @@
 
     private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
         String data = String.format("%d|%f|%f", time, overallRate, currentRate);
-        communicationService.broadcast(new ClusterMessage(nodeId, SAMPLE, data.getBytes()));
+        communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
     }
 
     private class InternalSampleCollector implements ClusterMessageHandler {
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index 909f3a5..de9e9f2 100644
--- a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -249,14 +249,14 @@
     public void start() {
         if (stopped) {
             stopped = false;
-            communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
+            communicationService.broadcast(START, CONTROL, str -> str.getBytes());
             startTestRun();
         }
     }
 
     public void stop() {
         if (!stopped) {
-            communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
+            communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
             stopTestRun();
         }
     }
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index cbf2398..59970f3 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -15,13 +15,16 @@
  */
 package org.onosproject.store.cluster.messaging;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
 import org.onosproject.cluster.NodeId;
 
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
 
-// TODO: remove IOExceptions?
 /**
  * Service for assisting communications between controller cluster nodes.
  */
@@ -33,6 +36,7 @@
      * @param message  message to send
      * @return true if the message was sent successfully to all nodes; false otherwise.
      */
+    @Deprecated
     boolean broadcast(ClusterMessage message);
 
     /**
@@ -41,6 +45,7 @@
      * @param message  message to send
      * @return true if the message was sent successfully to all nodes; false otherwise.
      */
+    @Deprecated
     boolean broadcastIncludeSelf(ClusterMessage message);
 
     /**
@@ -50,6 +55,7 @@
      * @param toNodeId node identifier
      * @return true if the message was sent successfully; false otherwise.
      */
+    @Deprecated
     boolean unicast(ClusterMessage message, NodeId toNodeId);
 
     /**
@@ -59,6 +65,7 @@
      * @param nodeIds  recipient node identifiers
      * @return true if the message was sent successfully to all nodes in the group; false otherwise.
      */
+    @Deprecated
     boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds);
 
     /**
@@ -66,18 +73,9 @@
      * @param message message to send
      * @param toNodeId recipient node identifier
      * @return reply future.
-     * @throws IOException when I/O exception of some sort has occurred
-     */
-    ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
-
-    /**
-     * Adds a new subscriber for the specified message subject.
-     *
-     * @param subject    message subject
-     * @param subscriber message subscriber
      */
     @Deprecated
-    void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber);
+    ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId);
 
     /**
      * Adds a new subscriber for the specified message subject.
@@ -86,13 +84,115 @@
      * @param subscriber message subscriber
      * @param executor executor to use for running handler.
      */
+    @Deprecated
     void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
 
     /**
+     * Broadcasts a message to all controller nodes.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding message to byte[]
+     * @param <M> message type
+     */
+    <M> void broadcast(M message,
+                       MessageSubject subject,
+                       Function<M, byte[]> encoder);
+
+    /**
+     * Broadcasts a message to all controller nodes including self.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding message to byte[]
+     * @param <M> message type
+     */
+    <M> void broadcastIncludeSelf(M message,
+                                  MessageSubject subject,
+                                  Function<M, byte[]> encoder);
+
+    /**
+     * Sends a message to the specified controller node.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding message to byte[]
+     * @param toNodeId destination node identifier
+     * @param <M> message type
+     * @return true if the message was sent successfully; false otherwise
+     */
+    <M> boolean unicast(M message,
+                        MessageSubject subject,
+                        Function<M, byte[]> encoder,
+                        NodeId toNodeId);
+
+    /**
+     * Multicasts a message to a set of controller nodes.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding message to byte[]
+     * @param nodeIds  recipient node identifiers
+     * @param <M> message type
+     */
+    <M> void multicast(M message,
+                       MessageSubject subject,
+                       Function<M, byte[]> encoder,
+                       Set<NodeId> nodeIds);
+
+    /**
+     * Sends a message and expects a reply.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding request to byte[]
+     * @param decoder function for decoding response from byte[]
+     * @param toNodeId recipient node identifier
+     * @param <M> request type
+     * @param <R> reply type
+     * @return reply future
+     */
+    <M, R> CompletableFuture<R> sendAndReceive(M message,
+                                               MessageSubject subject,
+                                               Function<M, byte[]> encoder,
+                                               Function<byte[], R> decoder,
+                                               NodeId toNodeId);
+
+    /**
+     * Adds a new subscriber for the specified message subject.
+     *
+     * @param subject message subject
+     * @param decoder decoder for resurrecting incoming message
+     * @param handler handler function that process the incoming message and produces a reply
+     * @param encoder encoder for serializing reply
+     * @param executor executor to run this handler on
+     * @param <M> incoming message type
+     * @param <R> reply message type
+     */
+    <M, R> void addSubscriber(MessageSubject subject,
+                              Function<byte[], M> decoder,
+                              Function<M, R> handler,
+                              Function<R, byte[]> encoder,
+                              ExecutorService executor);
+
+    /**
+     * Adds a new subscriber for the specified message subject.
+     *
+     * @param subject message subject
+     * @param decoder decoder to resurrecting incoming message
+     * @param handler handler for handling message
+     * @param executor executor to run this handler on
+     * @param <M> incoming message type
+     */
+    <M> void addSubscriber(MessageSubject subject,
+                           Function<byte[], M> decoder,
+                           Consumer<M> handler,
+                           ExecutorService executor);
+
+    /**
      * Removes a subscriber for the specified message subject.
      *
-     * @param subject    message subject
+     * @param subject message subject
      */
     void removeSubscriber(MessageSubject subject);
-
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 5c1fc33..7e7ec06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -17,7 +17,6 @@
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -54,13 +53,14 @@
 import org.slf4j.Logger;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+
 import static com.google.common.io.ByteStreams.toByteArray;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.onlab.util.Tools.groupedThreads;
@@ -351,22 +351,34 @@
      */
     private void fetchBits(Application app) {
         ControllerNode localNode = clusterService.getLocalNode();
-        ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
-                                                    app.id().name().getBytes(Charsets.UTF_8));
-        //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
         CountDownLatch latch = new CountDownLatch(1);
 
         // FIXME: send message with name & version to make sure we don't get served old bits
 
         log.info("Downloading bits for application {}", app.id().name());
         for (ControllerNode node : clusterService.getNodes()) {
-            try {
-                ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
-                future.addListener(new InternalBitListener(app, node, future, latch), executor);
-            } catch (IOException e) {
-                log.debug("Unable to request bits for application {} from node {}",
-                          app.id().name(), node.id());
+            if (latch.getCount() == 0) {
+                break;
             }
+            if (node.equals(localNode)) {
+                continue;
+            }
+            clusterCommunicator.sendAndReceive(app.id().name(),
+                                    APP_BITS_REQUEST,
+                                    s -> s.getBytes(Charsets.UTF_8),
+                                    Function.identity(),
+                                    node.id())
+                               .whenCompleteAsync((bits, error) -> {
+                                   if (error == null && latch.getCount() > 0) {
+                                       saveApplication(new ByteArrayInputStream(bits));
+                                       log.info("Downloaded bits for application {} from node {}",
+                                               app.id().name(), node.id());
+                                       latch.countDown();
+                                   } else if (error != null) {
+                                       log.warn("Unable to fetch bits for application {} from node {}",
+                                               app.id().name(), node.id(), error);
+                                   }
+                               }, executor);
         }
 
         try {
@@ -392,41 +404,6 @@
             }
         }
     }
-
-    /**
-     * Processes completed fetch requests.
-     */
-    private class InternalBitListener implements Runnable {
-        private final Application app;
-        private final ControllerNode node;
-        private final ListenableFuture<byte[]> future;
-        private final CountDownLatch latch;
-
-        public InternalBitListener(Application app, ControllerNode node,
-                                   ListenableFuture<byte[]> future, CountDownLatch latch) {
-            this.app = app;
-            this.node = node;
-            this.future = future;
-            this.latch = latch;
-        }
-
-        @Override
-        public void run() {
-            if (latch.getCount() > 0 && !future.isCancelled()) {
-                try {
-                    byte[] bits = future.get(1, MILLISECONDS);
-                    saveApplication(new ByteArrayInputStream(bits));
-                    log.info("Downloaded bits for application {} from node {}",
-                             app.id().name(), node.id());
-                    latch.countDown();
-                } catch (Exception e) {
-                    log.warn("Unable to fetch bits for application {} from node {}",
-                             app.id().name(), node.id());
-                }
-            }
-        }
-    }
-
     /**
      * Prunes applications which are not in the map, but are on disk.
      */
@@ -449,6 +426,4 @@
                                       appDesc.origin(), appDesc.permissions(),
                                       appDesc.featuresRepo(), appDesc.features());
     }
-
 }
-
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 2f6a149..10bf6a4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -419,10 +419,9 @@
                             // Dispatch to all instances
 
                             clusterCommunicator.broadcastIncludeSelf(
-                                    new ClusterMessage(
-                                            clusterService.getLocalNode().id(),
-                                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                                            SERIALIZER.encode(leadershipEvent)));
+                                    leadershipEvent,
+                                    LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                    SERIALIZER::encode);
                         } else {
                             //
                             // Test if time to expire a stale leader
@@ -491,11 +490,11 @@
                         leadershipEvent = new LeadershipEvent(
                                              LeadershipEvent.Type.LEADER_ELECTED,
                                              new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
+
                         clusterCommunicator.broadcastIncludeSelf(
-                                new ClusterMessage(
-                                        clusterService.getLocalNode().id(),
-                                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                                        SERIALIZER.encode(leadershipEvent)));
+                                leadershipEvent,
+                                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                SERIALIZER::encode);
                     }
 
                     // Sleep forever until interrupted
@@ -519,11 +518,12 @@
                         leadershipEvent = new LeadershipEvent(
                                                  LeadershipEvent.Type.LEADER_BOOTED,
                                                  new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
+
                         clusterCommunicator.broadcastIncludeSelf(
-                                new ClusterMessage(
-                                        clusterService.getLocalNode().id(),
-                                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                                        SERIALIZER.encode(leadershipEvent)));
+                                leadershipEvent,
+                                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                SERIALIZER::encode);
+
                         if (leaderLock.isLockedByCurrentThread()) {
                             leaderLock.unlock();
                         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 42a79b3..204e0fd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.cluster.messaging.impl;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -37,8 +36,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
 import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -122,7 +130,84 @@
         return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
     }
 
-    private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
+    @Override
+    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
+        SettableFuture<byte[]> response = SettableFuture.create();
+        sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
+            if (e == null) {
+                response.set(r);
+            } else {
+                response.setException(e);
+            }
+        });
+        return response;
+    }
+
+    @Override
+    public <M> void broadcast(M message,
+                              MessageSubject subject,
+                              Function<M, byte[]> encoder) {
+        multicast(message,
+                  subject,
+                  encoder,
+                  clusterService.getNodes()
+                      .stream()
+                      .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+                      .map(ControllerNode::id)
+                      .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public <M> void broadcastIncludeSelf(M message,
+                                         MessageSubject subject,
+                                         Function<M, byte[]> encoder) {
+        multicast(message,
+                  subject,
+                  encoder,
+                  clusterService.getNodes()
+                      .stream()
+                      .map(ControllerNode::id)
+                      .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public <M> boolean unicast(M message,
+                               MessageSubject subject,
+                               Function<M, byte[]> encoder,
+                               NodeId toNodeId) {
+        byte[] payload = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                encoder.apply(message)).getBytes();
+        return unicastUnchecked(subject, payload, toNodeId);
+    }
+
+    @Override
+    public <M> void multicast(M message,
+                              MessageSubject subject,
+                              Function<M, byte[]> encoder,
+                              Set<NodeId> nodes) {
+        byte[] payload = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                encoder.apply(message)).getBytes();
+        nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+    }
+
+    @Override
+    public <M, R> CompletableFuture<R> sendAndReceive(M message,
+                                                      MessageSubject subject,
+                                                      Function<M, byte[]> encoder,
+                                                      Function<byte[], R> decoder,
+                                                      NodeId toNodeId) {
+        ClusterMessage envelope = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                encoder.apply(message));
+        return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+    }
+
+    private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
@@ -131,37 +216,15 @@
             return true;
         } catch (IOException e) {
             log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
-            throw e;
-        }
-    }
-
-    private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
-        try {
-            return unicast(subject, payload, toNodeId);
-        } catch (IOException e) {
             return false;
         }
     }
 
-    @Override
-    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+    private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
-        try {
-            return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
-
-        } catch (IOException e) {
-            log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
-            throw e;
-        }
-    }
-
-    @Override
-    @Deprecated
-    public void addSubscriber(MessageSubject subject,
-                              ClusterMessageHandler subscriber) {
-        messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
+        return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
     }
 
     @Override
@@ -202,6 +265,60 @@
         }
     }
 
+    @Override
+    public <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, R> handler,
+            Function<R, byte[]> encoder,
+            ExecutorService executor) {
+        messagingService.registerHandler(subject.value(),
+                new InternalMessageResponder<>(decoder, encoder, handler),
+                executor);
+    }
+
+    @Override
+    public <M> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Consumer<M> handler,
+            ExecutorService executor) {
+        messagingService.registerHandler(subject.value(),
+                new InternalMessageConsumer<>(decoder, handler),
+                executor);
+    }
+
+    private class InternalMessageResponder<M, R> implements MessageHandler {
+        private final Function<byte[], M> decoder;
+        private final Function<R, byte[]> encoder;
+        private final Function<M, R> handler;
+
+        public InternalMessageResponder(Function<byte[], M> decoder,
+                                        Function<R, byte[]> encoder,
+                                        Function<M, R> handler) {
+            this.decoder = decoder;
+            this.encoder = encoder;
+            this.handler = handler;
+        }
+        @Override
+        public void handle(Message message) throws IOException {
+            R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+            message.respond(encoder.apply(response));
+        }
+    }
+
+    private class InternalMessageConsumer<M> implements MessageHandler {
+        private final Function<byte[], M> decoder;
+        private final Consumer<M> consumer;
+
+        public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+            this.decoder = decoder;
+            this.consumer = consumer;
+        }
+        @Override
+        public void handle(Message message) throws IOException {
+            consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+        }
+    }
+
     public static final class InternalClusterMessage extends ClusterMessage {
 
         private final Message rawMessage;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 8bebff7..cf3700b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -343,11 +343,9 @@
 
     private void notifyPeers(LeadershipEvent event) {
         eventDispatcher.post(event);
-        clusterCommunicator.broadcast(
-                new ClusterMessage(
-                        clusterService.getLocalNode().id(),
-                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                        SERIALIZER.encode(event)));
+        clusterCommunicator.broadcast(event,
+                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                SERIALIZER::encode);
     }
 
     private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
@@ -366,11 +364,9 @@
         if (updatedLeader) {
             LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
             eventDispatcher.post(event);
-            clusterCommunicator.broadcast(
-                    new ClusterMessage(
-                            clusterService.getLocalNode().id(),
-                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                            SERIALIZER.encode(event)));
+            clusterCommunicator.broadcast(event,
+                    LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                    SERIALIZER::encode);
         }
     }
 
@@ -469,11 +465,9 @@
             leaderBoard.forEach((path, leadership) -> {
                 if (leadership.leader().equals(localNodeId)) {
                     LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
-                    clusterCommunicator.broadcast(
-                            new ClusterMessage(
-                                    clusterService.getLocalNode().id(),
-                                    LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                                    SERIALIZER.encode(event)));
+                    clusterCommunicator.broadcast(event,
+                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                            SERIALIZER::encode);
                 }
             });
         } catch (Exception e) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 131000b..b47376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -304,11 +304,9 @@
 
             DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
                     providerId, deviceId, deviceDescription);
-            ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
-                    SERIALIZER.encode(deviceInjectedEvent));
 
             // TODO check unicast return value
-            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
             /* error log:
             log.warn("Failed to process injected device id: {} desc: {} " +
                             "(cluster messaging failed: {})",
@@ -555,11 +553,9 @@
             }
 
             PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
-            ClusterMessage clusterMessage = new ClusterMessage(
-                    localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
 
             //TODO check unicast return value
-            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
             /* error log:
             log.warn("Failed to process injected ports of device id: {} " +
                             "(cluster messaging failed: {})",
@@ -867,13 +863,8 @@
             log.debug("{} has control of {}, forwarding remove request",
                      master, deviceId);
 
-             ClusterMessage message = new ClusterMessage(
-                     myId,
-                     DEVICE_REMOVE_REQ,
-                     SERIALIZER.encode(deviceId));
-
             // TODO check unicast return value
-            clusterCommunicator.unicast(message, master);
+            clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
              /* error log:
              log.error("Failed to forward {} remove request to {}", deviceId, master, e);
              */
@@ -1057,19 +1048,11 @@
     }
 
     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.unicast(message, recipient);
+        clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
     }
 
     private void notifyPeers(InternalDeviceEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 33e4251..c46131e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -510,11 +510,7 @@
     }
 
     private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                serializer.encode(event));
-        return clusterCommunicator.unicast(message, peer);
+        return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
         // Note: we had this flipped before...
 //        communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 781e24b..f5807e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
 import com.hazelcast.core.IMap;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -35,6 +36,7 @@
 import org.onlab.util.BoundedThreadPool;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.NewConcurrentHashMap;
+import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
@@ -93,7 +95,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -360,22 +361,16 @@
         log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), rule.deviceId());
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.GET_FLOW_ENTRY,
-                SERIALIZER.encode(rule));
-
-        try {
-            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-            log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
-        }
-        return null;
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
+                                    FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+                                    SERIALIZER::encode,
+                                    SERIALIZER::decode,
+                                    replicaInfo.master().get()),
+                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                               TimeUnit.MILLISECONDS,
+                               null);
     }
 
-
-
     @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
 
@@ -393,22 +388,16 @@
         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GET_DEVICE_FLOW_ENTRIES,
-                SERIALIZER.encode(deviceId));
-
-        try {
-            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-            log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
-        }
-        return Collections.emptyList();
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
+                                    FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+                                    SERIALIZER::encode,
+                                    SERIALIZER::decode,
+                                    replicaInfo.master().get()),
+                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                               TimeUnit.MILLISECONDS,
+                               Collections.emptyList());
     }
 
-
-
     @Override
     public void storeFlowRule(FlowRule rule) {
         storeBatch(new FlowRuleBatchOperation(
@@ -453,14 +442,10 @@
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(
-                local,
-                APPLY_BATCH_FLOWS,
-                SERIALIZER.encode(operation));
-
-
-        if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
-            log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
+        if (!clusterCommunicator.unicast(operation,
+                APPLY_BATCH_FLOWS, SERIALIZER::encode,
+                replicaInfo.master().get())) {
+            log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
 
             Set<FlowRule> allFailures = operation.getOperations().stream()
                     .map(op -> op.target())
@@ -612,18 +597,15 @@
         log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                REMOVE_FLOW_ENTRY,
-                SERIALIZER.encode(rule));
-
-        try {
-            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-            // TODO: Retry against latest master or throw a FlowStoreException
-            throw new RuntimeException(e);
-        }
+        return Futures.get(clusterCommunicator.sendAndReceive(
+                               rule,
+                               REMOVE_FLOW_ENTRY,
+                               SERIALIZER::encode,
+                               SERIALIZER::decode,
+                               replicaInfo.master().get()),
+                           FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                           TimeUnit.MILLISECONDS,
+                           RuntimeException.class);
     }
 
     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -649,12 +631,8 @@
         if (nodeId == null) {
             notifyDelegate(event);
         } else {
-            ClusterMessage message = new ClusterMessage(
-                    clusterService.getLocalNode().id(),
-                    REMOTE_APPLY_COMPLETED,
-                    SERIALIZER.encode(event));
             // TODO check unicast return value
-            clusterCommunicator.unicast(message, nodeId);
+            clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
             //error log: log.warn("Failed to respond to peer for batch operation result");
         }
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 708faf6..b609a3c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -20,6 +20,7 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -45,7 +46,6 @@
 import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.serializers.DecodeTo;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.StoreSerializer;
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
@@ -199,18 +199,12 @@
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(clusterService
-                    .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
-
-        try {
-            ListenableFuture<byte[]> responseFuture = clusterCommunicator
-                    .sendAndReceive(message, replicaInfo.master().get());
-            // here should add another decode process
-            return Futures.transform(responseFuture,
-                                     new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
-        } catch (IOException e) {
-            return Futures.immediateFailedFuture(e);
-        }
+        return clusterCommunicator.sendAndReceive(
+                batchOperation,
+                APPLY_EXTEND_FLOWS,
+                SERIALIZER::encode,
+                SERIALIZER::decode,
+                replicaInfo.master().get());
     }
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 51b4111..ae1669b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -382,17 +382,13 @@
             GroupStoreMessage groupOp = GroupStoreMessage.
                     createGroupAddRequestMsg(groupDesc.deviceId(),
                                              groupDesc);
-            ClusterMessage message = new ClusterMessage(
-                                    clusterService.getLocalNode().id(),
-                                    GroupStoreMessageSubjects.
-                                    REMOTE_GROUP_OP_REQUEST,
-                                    kryoBuilder.build().serialize(groupOp));
-            if (!clusterCommunicator.unicast(message,
-                                             mastershipService.
-                                             getMasterFor(
-                                                groupDesc.deviceId()))) {
+
+            if (!clusterCommunicator.unicast(groupOp,
+                    GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+                    m -> kryoBuilder.build().serialize(m),
+                    mastershipService.getMasterFor(groupDesc.deviceId()))) {
                 log.warn("Failed to send request to master: {} to {}",
-                         message,
+                         groupOp,
                          mastershipService.getMasterFor(groupDesc.deviceId()));
                 //TODO: Send Group operation failure event
             }
@@ -472,16 +468,13 @@
                                                 type,
                                                 newBuckets,
                                                 newAppCookie);
-            ClusterMessage message =
-                    new ClusterMessage(clusterService.getLocalNode().id(),
-                                       GroupStoreMessageSubjects.
-                                              REMOTE_GROUP_OP_REQUEST,
-                                       kryoBuilder.build().serialize(groupOp));
-            if (!clusterCommunicator.unicast(message,
-                                             mastershipService.
-                                             getMasterFor(deviceId))) {
+
+            if (!clusterCommunicator.unicast(groupOp,
+                        GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+                        m -> kryoBuilder.build().serialize(m),
+                        mastershipService.getMasterFor(deviceId))) {
                 log.warn("Failed to send request to master: {} to {}",
-                         message,
+                         groupOp,
                          mastershipService.getMasterFor(deviceId));
                 //TODO: Send Group operation failure event
             }
@@ -584,16 +577,13 @@
             GroupStoreMessage groupOp = GroupStoreMessage.
                     createGroupDeleteRequestMsg(deviceId,
                                                 appCookie);
-            ClusterMessage message =
-                    new ClusterMessage(clusterService.getLocalNode().id(),
-                                       GroupStoreMessageSubjects.
-                                              REMOTE_GROUP_OP_REQUEST,
-                                       kryoBuilder.build().serialize(groupOp));
-            if (!clusterCommunicator.unicast(message,
-                                             mastershipService.
-                                             getMasterFor(deviceId))) {
+
+            if (!clusterCommunicator.unicast(groupOp,
+                    GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+                    m -> kryoBuilder.build().serialize(m),
+                    mastershipService.getMasterFor(deviceId))) {
                 log.warn("Failed to send request to master: {} to {}",
-                         message,
+                         groupOp,
                          mastershipService.getMasterFor(deviceId));
                 //TODO: Send Group operation failure event
             }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
index 74236c3..1e6f8aa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
@@ -477,21 +477,13 @@
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
     }
 
     private void unicastMessage(NodeId peer,
                                 MessageSubject subject,
                                 Object event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.unicast(message, peer);
+        clusterCommunicator.unicast(event, subject, SERIALIZER::encode, peer);
     }
 
     private void notifyDelegateIfNotNull(HostEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 48c430b..5e09d05 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -334,17 +334,12 @@
 
 
             LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
-            ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
-                    GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
 
             // TODO check unicast return value
-            clusterCommunicator.unicast(linkInjectedMessage, dstNode);
-            /* error log:
-            log.warn("Failed to process link update between src: {} and dst: {} " +
-                            "(cluster messaging failed: {})",
-                    linkDescription.src(), linkDescription.dst(), e);
-            */
-
+            clusterCommunicator.unicast(linkInjectedEvent,
+                    GossipLinkStoreMessageSubjects.LINK_INJECTED,
+                    SERIALIZER::encode,
+                    dstNode);
         }
 
         return linkEvent;
@@ -653,19 +648,11 @@
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
     }
 
     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.unicast(message, recipient);
+        clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
     }
 
     private void notifyPeers(InternalLinkEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 25870b7..8a54502 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -181,20 +181,14 @@
             } else {
                 return MastershipRole.NONE;
             }
-        } else {
-            try {
-                MastershipRole role = complete(clusterCommunicator.sendAndReceive(
-                                                        new ClusterMessage(
-                                                                    localNodeId,
-                                                                    ROLE_QUERY_SUBJECT,
-                                                                    SERIALIZER.encode(deviceId)),
-                                                        nodeId));
-                return role == null ? MastershipRole.NONE : role;
-            } catch (IOException e) {
-                log.warn("Failed to query {} for {}'s role. Defaulting to NONE", nodeId, deviceId, e);
-                return MastershipRole.NONE;
-            }
         }
+        MastershipRole role = complete(clusterCommunicator.sendAndReceive(
+                deviceId,
+                ROLE_QUERY_SUBJECT,
+                SERIALIZER::encode,
+                SERIALIZER::decode,
+                nodeId));
+        return role == null ? MastershipRole.NONE : role;
     }
 
     @Override
@@ -276,17 +270,12 @@
         if (!nodeId.equals(localNodeId)) {
             log.debug("Forwarding request to relinquish "
                     + "role for device {} to {}", deviceId, nodeId);
-            try {
-                return complete(clusterCommunicator.sendAndReceive(
-                                                        new ClusterMessage(
-                                                                    localNodeId,
-                                                                    ROLE_RELINQUISH_SUBJECT,
-                                                                    SERIALIZER.encode(deviceId)),
-                                                        nodeId));
-            } catch (IOException e) {
-                log.warn("Failed to send a request to relinquish role for {} to {}", deviceId, nodeId, e);
-                return null;
-            }
+            return complete(clusterCommunicator.sendAndReceive(
+                    deviceId,
+                    ROLE_RELINQUISH_SUBJECT,
+                    SERIALIZER::encode,
+                    SERIALIZER::decode,
+                    nodeId));
         }
 
         // Check if this node is can be managed by this node.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 7ee8712..8bf8ae6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -131,9 +131,7 @@
         }
 
         // TODO check unicast return value
-        communicationService.unicast(new ClusterMessage(myId, PACKET_OUT_SUBJECT,
-                                                        SERIALIZER.encode(packet)),
-                                     master);
+        communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
         // error log: log.warn("Failed to send packet-out to {}", master);
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index f07c0e8..e314789 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.statistic.impl;
 
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,7 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
@@ -47,12 +49,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.onlab.util.Tools.groupedThreads;
@@ -218,20 +217,15 @@
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getCurrentStatisticInternal(connectPoint);
         } else {
-            ClusterMessage message = new ClusterMessage(
-                    clusterService.getLocalNode().id(),
-                    GET_CURRENT,
-                    SERIALIZER.encode(connectPoint));
-
-            try {
-                Future<byte[]> response =
-                        clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-                return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
-                                                      TimeUnit.MILLISECONDS));
-            } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-                log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
-                return Collections.emptySet();
-            }
+            return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+                                        connectPoint,
+                                        GET_CURRENT,
+                                        SERIALIZER::encode,
+                                        SERIALIZER::decode,
+                                        replicaInfo.master().get()),
+                                   STATISTIC_STORE_TIMEOUT_MILLIS,
+                                   TimeUnit.MILLISECONDS,
+                                   Collections.emptySet());
         }
 
     }
@@ -251,22 +245,16 @@
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getPreviousStatisticInternal(connectPoint);
         } else {
-            ClusterMessage message = new ClusterMessage(
-                    clusterService.getLocalNode().id(),
-                    GET_PREVIOUS,
-                    SERIALIZER.encode(connectPoint));
-
-            try {
-                Future<byte[]> response =
-                        clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-                return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
-                                                      TimeUnit.MILLISECONDS));
-            } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-                log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
-                return Collections.emptySet();
-            }
+            return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+                                        connectPoint,
+                                        GET_PREVIOUS,
+                                        SERIALIZER::encode,
+                                        SERIALIZER::decode,
+                                        replicaInfo.master().get()),
+                                   STATISTIC_STORE_TIMEOUT_MILLIS,
+                                   TimeUnit.MILLISECONDS,
+                                   Collections.emptySet());
         }
-
     }
 
     private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index 7054bd3..f271de4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -65,6 +65,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static java.util.Arrays.asList;
 import static org.easymock.EasyMock.*;
@@ -181,8 +182,9 @@
                 new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
                         HW, swVersion, SN, CID, annotations);
         reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-            .andReturn(true).anyTimes();
+        clusterCommunicator.<InternalDeviceEvent>broadcast(
+                anyObject(InternalDeviceEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
         deviceStore.createOrUpdateDevice(PID, deviceId, description);
         verify(clusterCommunicator);
@@ -296,36 +298,43 @@
     }
 
     private void assertInternalDeviceEvent(NodeId sender,
-                                           DeviceId deviceId,
-                                           ProviderId providerId,
-                                           DeviceDescription expectedDesc,
-                                           Capture<ClusterMessage> actualMsg) {
-        assertTrue(actualMsg.hasCaptured());
-        assertEquals(sender, actualMsg.getValue().sender());
+            DeviceId deviceId,
+            ProviderId providerId,
+            DeviceDescription expectedDesc,
+            Capture<InternalDeviceEvent> actualEvent,
+            Capture<MessageSubject> actualSubject,
+            Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
+        assertTrue(actualEvent.hasCaptured());
+        assertTrue(actualSubject.hasCaptured());
+        assertTrue(actualEncoder.hasCaptured());
+
         assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
-                     actualMsg.getValue().subject());
-        InternalDeviceEvent addEvent
-            = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
-        assertEquals(deviceId, addEvent.deviceId());
-        assertEquals(providerId, addEvent.providerId());
-        assertDeviceDescriptionEquals(expectedDesc, addEvent.deviceDescription().value());
+                actualSubject.getValue());
+        assertEquals(deviceId, actualEvent.getValue().deviceId());
+        assertEquals(providerId, actualEvent.getValue().providerId());
+        assertDeviceDescriptionEquals(expectedDesc, actualEvent.getValue().deviceDescription().value());
     }
 
     private void assertInternalDeviceEvent(NodeId sender,
-                                           DeviceId deviceId,
-                                           ProviderId providerId,
-                                           DeviceDescription expectedDesc,
-                                           List<SparseAnnotations> expectedAnnotations,
-                                           Capture<ClusterMessage> actualMsg) {
-        assertTrue(actualMsg.hasCaptured());
-        assertEquals(sender, actualMsg.getValue().sender());
+            DeviceId deviceId,
+            ProviderId providerId,
+            DeviceDescription expectedDesc,
+            List<SparseAnnotations> expectedAnnotations,
+            Capture<InternalDeviceEvent> actualEvent,
+            Capture<MessageSubject> actualSubject,
+            Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
+        assertTrue(actualEvent.hasCaptured());
+        assertTrue(actualSubject.hasCaptured());
+        assertTrue(actualEncoder.hasCaptured());
+
         assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
-                actualMsg.getValue().subject());
-        InternalDeviceEvent addEvent
-            = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
-        assertEquals(deviceId, addEvent.deviceId());
-        assertEquals(providerId, addEvent.providerId());
-        assertDeviceDescriptionEquals(expectedDesc, expectedAnnotations, addEvent.deviceDescription().value());
+                actualSubject.getValue());
+        assertEquals(deviceId, actualEvent.getValue().deviceId());
+        assertEquals(providerId, actualEvent.getValue().providerId());
+        assertDeviceDescriptionEquals(
+                expectedDesc,
+                expectedAnnotations,
+                actualEvent.getValue().deviceDescription().value());
     }
 
     @Test
@@ -333,26 +342,28 @@
         DeviceDescription description =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW1, SN, CID);
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
         assertEquals(DEVICE_ADDED, event.type());
         assertDevice(DID1, SW1, event.subject());
         verify(clusterCommunicator);
-        assertInternalDeviceEvent(NID1, DID1, PID, description, bcast);
+        assertInternalDeviceEvent(NID1, DID1, PID, description, message, subject, encoder);
 
 
         DeviceDescription description2 =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW2, SN, CID);
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
         assertEquals(DEVICE_UPDATED, event2.type());
         assertDevice(DID1, SW2, event2.subject());
 
         verify(clusterCommunicator);
-        assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+        assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
         reset(clusterCommunicator);
 
         assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
@@ -366,7 +377,11 @@
                         HW, SW1, SN, CID, A2);
         Capture<ClusterMessage> bcast = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
+
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
         assertEquals(DEVICE_ADDED, event.type());
         assertDevice(DID1, SW1, event.subject());
@@ -374,13 +389,13 @@
         assertAnnotationsEquals(event.subject().annotations(), A2);
         assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
         verify(clusterCommunicator);
-        assertInternalDeviceEvent(NID1, DID1, PIDA, description, bcast);
+        assertInternalDeviceEvent(NID1, DID1, PIDA, description, message, subject, encoder);
 
         // update from primary
         DeviceDescription description2 =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW2, SN, CID, A1);
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
 
         DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
         assertEquals(DEVICE_UPDATED, event2.type());
@@ -389,17 +404,17 @@
         assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
         assertTrue(deviceStore.isAvailable(DID1));
         verify(clusterCommunicator);
-        assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+        assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
 
         // no-op update from primary
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
         assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
 
         verify(clusterCommunicator);
         assertFalse("no broadcast expected", bcast.hasCaptured());
 
         // For now, Ancillary is ignored once primary appears
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
 
         assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
 
@@ -410,7 +425,7 @@
         DeviceDescription description3 =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW1, SN, CID, A2_2);
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
 
         DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
         assertEquals(DEVICE_UPDATED, event3.type());
@@ -423,7 +438,7 @@
         verify(clusterCommunicator);
         // note: only annotation from PIDA is sent over the wire
         assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
-                                  asList(union(A2, A2_2)), bcast);
+                                  asList(union(A2, A2_2)), message, subject, encoder);
 
     }
 
@@ -434,23 +449,25 @@
         putDevice(DID1, SW1);
         assertTrue(deviceStore.isAvailable(DID1));
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         DeviceEvent event = deviceStore.markOffline(DID1);
         assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
         assertDevice(DID1, SW1, event.subject());
         assertFalse(deviceStore.isAvailable(DID1));
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
 
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
         DeviceEvent event2 = deviceStore.markOffline(DID1);
         assertNull("No change, no event", event2);
         verify(clusterCommunicator);
-        assertFalse(bcast.hasCaptured());
+        assertFalse(message.hasCaptured());
     }
 
     @Test
@@ -460,13 +477,15 @@
                 new DefaultPortDescription(P1, true),
                 new DefaultPortDescription(P2, true)
                 );
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
         Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
         for (DeviceEvent event : events) {
@@ -485,11 +504,11 @@
                 new DefaultPortDescription(P3, true)
                 );
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         events = deviceStore.updatePorts(PID, DID1, pds2);
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
         assertFalse("event should be triggered", events.isEmpty());
         for (DeviceEvent event : events) {
@@ -513,11 +532,11 @@
                 new DefaultPortDescription(P1, false),
                 new DefaultPortDescription(P2, true)
                 );
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         events = deviceStore.updatePorts(PID, DID1, pds3);
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
         assertFalse("event should be triggered", events.isEmpty());
         for (DeviceEvent event : events) {
@@ -544,9 +563,11 @@
                 );
         deviceStore.updatePorts(PID, DID1, pds);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalPortStatusEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
         DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
         assertEquals(PORT_UPDATED, event.type());
@@ -554,8 +575,8 @@
         assertEquals(P1, event.port().number());
         assertFalse("Port is disabled", event.port().isEnabled());
         verify(clusterCommunicator);
-        assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, bcast);
-        assertTrue(bcast.hasCaptured());
+        assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, message, subject, encoder);
+        assertTrue(message.hasCaptured());
     }
 
     @Test
@@ -567,11 +588,13 @@
                 );
         deviceStore.updatePorts(PID, DID1, pds);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
-
+        Capture<InternalPortStatusEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
 
         // update port from primary
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
+
         final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
         DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
         assertEquals(PORT_UPDATED, event.type());
@@ -580,19 +603,19 @@
         assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
         assertFalse("Port is disabled", event.port().isEnabled());
         verify(clusterCommunicator);
-        assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), bcast);
-        assertTrue(bcast.hasCaptured());
+        assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), message, subject, encoder);
+        assertTrue(message.hasCaptured());
 
         // update port from ancillary with no attributes
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
         final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
         DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
         assertNull("Ancillary is ignored if primary exists", event2);
         verify(clusterCommunicator);
-        assertFalse(bcast.hasCaptured());
+        assertFalse(message.hasCaptured());
 
         // but, Ancillary annotation update will be notified
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
         DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
         assertEquals(PORT_UPDATED, event3.type());
@@ -601,11 +624,11 @@
         assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
         assertFalse("Port is disabled", event3.port().isEnabled());
         verify(clusterCommunicator);
-        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), bcast);
-        assertTrue(bcast.hasCaptured());
+        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), message, subject, encoder);
+        assertTrue(message.hasCaptured());
 
         // port only reported from Ancillary will be notified as down
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
         DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
         assertEquals(PORT_ADDED, event4.type());
@@ -616,25 +639,29 @@
                         event4.port().isEnabled());
         verify(clusterCommunicator);
         // TODO: verify broadcast message content
-        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, bcast);
-        assertTrue(bcast.hasCaptured());
+        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, message, subject, encoder);
+        assertTrue(message.hasCaptured());
     }
 
-    private void assertInternalPortStatusEvent(NodeId sender, DeviceId did,
-            ProviderId pid, DefaultPortDescription expectedDesc,
-            List<SparseAnnotations> expectedAnnotations, Capture<ClusterMessage> actualMsg) {
+    private void assertInternalPortStatusEvent(NodeId sender,
+            DeviceId did,
+            ProviderId pid,
+            DefaultPortDescription expectedDesc,
+            List<SparseAnnotations> expectedAnnotations,
+            Capture<InternalPortStatusEvent> actualEvent,
+            Capture<MessageSubject> actualSubject,
+            Capture<Function<InternalPortStatusEvent, byte[]>> actualEncoder) {
 
-        assertTrue(actualMsg.hasCaptured());
-        assertEquals(sender, actualMsg.getValue().sender());
+        assertTrue(actualEvent.hasCaptured());
+        assertTrue(actualSubject.hasCaptured());
+        assertTrue(actualEncoder.hasCaptured());
+
         assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
-                actualMsg.getValue().subject());
-        InternalPortStatusEvent addEvent
-            = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
-        assertEquals(did, addEvent.deviceId());
-        assertEquals(pid, addEvent.providerId());
+                actualSubject.getValue());
+        assertEquals(did, actualEvent.getValue().deviceId());
+        assertEquals(pid, actualEvent.getValue().providerId());
         assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
-                addEvent.portDescription().value());
-
+                actualEvent.getValue().portDescription().value());
     }
 
     private void assertPortDescriptionEquals(
@@ -649,19 +676,31 @@
                          expectedAnnotations.toArray(new SparseAnnotations[0]));
     }
 
-    private void resetCommunicatorExpectingNoBroadcast(
-            Capture<ClusterMessage> bcast) {
-        bcast.reset();
+    private <T> void resetCommunicatorExpectingNoBroadcast(
+            Capture<T> message,
+            Capture<MessageSubject> subject,
+            Capture<Function<T, byte[]>> encoder) {
+        message.reset();
+        subject.reset();
+        encoder.reset();
         reset(clusterCommunicator);
         replay(clusterCommunicator);
     }
 
-    private void resetCommunicatorExpectingSingleBroadcast(
-            Capture<ClusterMessage> bcast) {
+    private <T> void resetCommunicatorExpectingSingleBroadcast(
+            Capture<T> message,
+            Capture<MessageSubject> subject,
+            Capture<Function<T, byte[]>> encoder) {
 
-        bcast.reset();
+        message.reset();
+        subject.reset();
+        encoder.reset();
         reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+        clusterCommunicator.broadcast(
+                    capture(message),
+                    capture(subject),
+                    capture(encoder));
+        expectLastCall().once();
         replay(clusterCommunicator);
     }
 
@@ -724,9 +763,11 @@
         assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
         assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
 
         DeviceEvent event = deviceStore.removeDevice(DID1);
         assertEquals(DEVICE_REMOVED, event.type());
@@ -736,7 +777,7 @@
         assertEquals(0, deviceStore.getPorts(DID1).size());
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
         // putBack Device, Port w/o annotation
         putDevice(DID1, SW1);
@@ -825,10 +866,6 @@
             this.clusterService = clusterService;
             this.clusterCommunicator = clusterCommunicator;
         }
-
-        public <T> T deserialize(byte[] bytes) {
-            return SERIALIZER.decode(bytes);
-        }
     }
 
     private static final class TestClusterService extends StaticClusterService {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 0a3d5e6..8d495eb 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -30,6 +30,7 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractEvent;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -44,17 +45,20 @@
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static junit.framework.TestCase.assertFalse;
@@ -281,7 +285,7 @@
 
         // Set up expected internal message to be broadcast to peers on first put
         expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
-                .peekAtNextTimestamp()), clusterCommunicator);
+                .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         // Put first value
         assertNull(ecMap.get(KEY1));
@@ -292,7 +296,7 @@
 
         // Set up expected internal message to be broadcast to peers on second put
         expectSpecificMulticastMessage(generatePutMessage(
-                KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
+                KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         // Update same key to a new value
         ecMap.put(KEY1, VALUE2);
@@ -341,7 +345,7 @@
         // Remove the value and check the correct internal cluster messages
         // are sent
         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                                       clusterCommunicator);
+                UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -352,7 +356,7 @@
         // the map, we expect that the tombstone is updated and another remove
         // event is sent to the cluster and external listeners.
         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                                       clusterCommunicator);
+                UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -402,7 +406,7 @@
         ecMap.addListener(listener);
 
         // Expect a multi-update inter-instance message
-        expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+        expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
                                        clusterCommunicator);
 
         Map<String, String> putAllValues = new HashMap<>();
@@ -441,7 +445,7 @@
         ecMap.put(KEY2, VALUE2);
 
         ecMap.addListener(listener);
-        expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+        expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         ecMap.clear();
 
@@ -605,7 +609,8 @@
                 SERIALIZER.encode(Lists.newArrayList(event)));
     }
 
-    private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
+    private List<PutEntry<String, String>> generatePutMessage(
+            String key1, String value1, String key2, String value2) {
         ArrayList<PutEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
@@ -617,10 +622,7 @@
         list.add(pe1);
         list.add(pe2);
 
-
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(list));
+        return list;
     }
 
     private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
@@ -631,7 +633,7 @@
                 SERIALIZER.encode(Lists.newArrayList(event)));
     }
 
-    private ClusterMessage generateRemoveMessage(String key1, String key2) {
+    private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
         ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
@@ -643,9 +645,7 @@
         list.add(re1);
         list.add(re2);
 
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(list));
+        return list;
     }
 
     /**
@@ -656,13 +656,13 @@
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
     //FIXME rename
-    private static void expectSpecificBroadcastMessage(ClusterMessage m,
-                           ClusterCommunicationService clusterCommunicator) {
+    private static <T> void expectSpecificBroadcastMessage(
+            T message,
+            MessageSubject subject,
+            ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-//        expect(clusterCommunicator.broadcast(m)).andReturn(true);
-        expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
-                .andReturn(true)
-                .anyTimes();
+        clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -670,17 +670,16 @@
      * Sets up a mock ClusterCommunicationService to expect a specific cluster
      * message to be multicast to the cluster.
      *
-     * @param m message we expect to be sent
+     * @param message message we expect to be sent
+     * @param subject subject we expect to be sent to
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
     //FIXME rename
-    private static void expectSpecificMulticastMessage(ClusterMessage m,
+    private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
                            ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-//        expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
-        expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
-                .andReturn(true)
-                .anyTimes();
+        clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -693,12 +692,15 @@
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
     //FIXME rename
-    private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
+    private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
 //        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
 //                                             anyObject(Iterable.class)))
-        expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
-                                           anyObject(NodeId.class)))
+        expect(clusterCommunicator.<T>unicast(
+                    anyObject(),
+                    anyObject(MessageSubject.class),
+                    anyObject(Function.class),
+                    anyObject(NodeId.class)))
                 .andReturn(true)
                 .anyTimes();
         replay(clusterCommunicator);
@@ -711,15 +713,14 @@
      *
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
-    //FIXME rename
     private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-//        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-//                .andReturn(true)
-//                .anyTimes();
-        expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
-                .andReturn(true)
-                .anyTimes();
+        clusterCommunicator.<AbstractEvent>multicast(
+                anyObject(AbstractEvent.class),
+                anyObject(MessageSubject.class),
+                anyObject(Function.class),
+                anyObject(Set.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -733,45 +734,6 @@
             implements ClusterCommunicationService {
 
         @Override
-        public boolean broadcast(ClusterMessage message) {
-            return false;
-        }
-
-        @Override
-        public boolean broadcastIncludeSelf(ClusterMessage message) {
-            return false;
-        }
-
-        @Override
-        public boolean unicast(ClusterMessage message, NodeId toNodeId)  {
-            return false;
-        }
-
-        @Override
-        public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
-            return false;
-        }
-
-        @Override
-        public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
-                                                       NodeId toNodeId)
-                throws IOException {
-            return null;
-        }
-
-        @Override
-        public void addSubscriber(MessageSubject subject,
-                                  ClusterMessageHandler subscriber) {
-            if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
-                updateHandler = subscriber;
-            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
-                antiEntropyHandler = subscriber;
-            } else {
-                throw new RuntimeException("Unexpected message subject " + subject.toString());
-            }
-        }
-
-        @Override
         public void addSubscriber(MessageSubject subject,
                                   ClusterMessageHandler subscriber,
                                   ExecutorService executor) {
@@ -786,6 +748,73 @@
 
         @Override
         public void removeSubscriber(MessageSubject subject) {}
+
+        @Override
+        public <M> void broadcast(M message, MessageSubject subject,
+                Function<M, byte[]> encoder) {
+        }
+
+        @Override
+        public <M> void broadcastIncludeSelf(M message,
+                MessageSubject subject, Function<M, byte[]> encoder) {
+        }
+
+        @Override
+        public <M> boolean unicast(M message, MessageSubject subject,
+                Function<M, byte[]> encoder, NodeId toNodeId) {
+            return false;
+        }
+
+        @Override
+        public <M> void multicast(M message, MessageSubject subject,
+                Function<M, byte[]> encoder, Set<NodeId> nodes) {
+        }
+
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message,
+                MessageSubject subject, Function<M, byte[]> encoder,
+                Function<byte[], R> decoder, NodeId toNodeId) {
+            return null;
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject,
+                Function<byte[], M> decoder, Function<M, R> handler,
+                Function<R, byte[]> encoder, ExecutorService executor) {
+        }
+
+        @Override
+        public <M> void addSubscriber(MessageSubject subject,
+                Function<byte[], M> decoder, Consumer<M> handler,
+                ExecutorService executor) {
+        }
+
+        @Override
+        public boolean broadcast(ClusterMessage message) {
+            return false;
+        }
+
+        @Override
+        public boolean broadcastIncludeSelf(ClusterMessage message) {
+            return false;
+        }
+
+        @Override
+        public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+            return false;
+        }
+
+        @Override
+        public boolean multicast(ClusterMessage message,
+                Iterable<NodeId> nodeIds) {
+            return false;
+        }
+
+        @Override
+        public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
+                NodeId toNodeId) {
+            return null;
+        }
     }
 
     /**
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
index 03e82bc..b0da337 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
@@ -28,7 +28,6 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.net.ConnectPoint;
@@ -48,7 +47,6 @@
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.store.cluster.StaticClusterService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.device.impl.DeviceClockManager;
@@ -59,6 +57,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static org.easymock.EasyMock.*;
 import static org.junit.Assert.*;
@@ -119,7 +118,6 @@
     private DeviceClockManager deviceClockManager;
     private DeviceClockService deviceClockService;
     private ClusterCommunicationService clusterCommunicator;
-    private MastershipService mastershipService;
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
@@ -171,26 +169,24 @@
         ConnectPoint src = new ConnectPoint(srcId, srcNum);
         ConnectPoint dst = new ConnectPoint(dstId, dstNum);
         reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-            .andReturn(true).anyTimes();
+        clusterCommunicator.<InternalLinkEvent>broadcast(
+                anyObject(InternalLinkEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
         linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
         verify(clusterCommunicator);
     }
 
-    private void resetCommunicatorExpectingNoBroadcast(
-                                                       Capture<ClusterMessage> bcast) {
-        bcast.reset();
+    private <T> void resetCommunicatorExpectingSingleBroadcast(
+            Capture<T> message,
+            Capture<MessageSubject> subject,
+            Capture<Function<T, byte[]>> encoder) {
+        message.reset();
+        subject.reset();
+        encoder.reset();
         reset(clusterCommunicator);
-        replay(clusterCommunicator);
-    }
-
-    private void resetCommunicatorExpectingSingleBroadcast(
-                                                           Capture<ClusterMessage> bcast) {
-
-        bcast.reset();
-        reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+        clusterCommunicator.broadcast(capture(message), capture(subject), capture(encoder));
+        expectLastCall().once();
         replay(clusterCommunicator);
     }
 
@@ -367,56 +363,55 @@
         ConnectPoint src = new ConnectPoint(DID1, P1);
         ConnectPoint dst = new ConnectPoint(DID2, P2);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalLinkEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
 
         // add link
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         final DefaultLinkDescription linkDescription = new DefaultLinkDescription(src, dst, INDIRECT);
         LinkEvent event = linkStore.createOrUpdateLink(PID,
                     linkDescription);
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject());
         assertEquals(LINK_ADDED, event.type());
 
         // update link type
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event2 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT));
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject());
         assertEquals(LINK_UPDATED, event2.type());
 
         // no change
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event3 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT));
-        verifyNoBroadcastMessage(bcast);
+        verifyNoBroadcastMessage(message);
 
         assertNull("No change event expected", event3);
     }
 
-    private void verifyNoBroadcastMessage(Capture<ClusterMessage> bcast) {
-        assertFalse("No broadcast expected", bcast.hasCaptured());
+    private <T> void verifyNoBroadcastMessage(Capture<T> message) {
+        assertFalse("No broadcast expected", message.hasCaptured());
     }
 
     private void verifyLinkBroadcastMessage(ProviderId providerId,
-                                            NodeId sender,
-                                            ConnectPoint src,
-                                            ConnectPoint dst,
-                                            Type type,
-                                            Capture<ClusterMessage> actualMsg) {
+            NodeId sender,
+            ConnectPoint src,
+            ConnectPoint dst,
+            Type type,
+            Capture<InternalLinkEvent> actualLinkEvent,
+            Capture<MessageSubject> actualSubject,
+            Capture<Function<InternalLinkEvent, byte[]>> actualEncoder) {
         verify(clusterCommunicator);
-        assertTrue(actualMsg.hasCaptured());
-        assertEquals(sender, actualMsg.getValue().sender());
-        assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE,
-                     actualMsg.getValue().subject());
-        InternalLinkEvent linkEvent
-            = GossipLinkStore.SERIALIZER.decode(actualMsg.getValue().payload());
-        assertEquals(providerId, linkEvent.providerId());
-        assertLinkDescriptionEquals(src, dst, type, linkEvent.linkDescription().value());
-
+        assertTrue(actualLinkEvent.hasCaptured());
+        assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE, actualSubject.getValue());
+        assertEquals(providerId, actualLinkEvent.getValue().providerId());
+        assertLinkDescriptionEquals(src, dst, type, actualLinkEvent.getValue().linkDescription().value());
     }
 
     private static void assertLinkDescriptionEquals(ConnectPoint src,
@@ -434,31 +429,33 @@
         ConnectPoint src = new ConnectPoint(DID1, P1);
         ConnectPoint dst = new ConnectPoint(DID2, P2);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalLinkEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
 
         // add Ancillary link
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event = linkStore.createOrUpdateLink(PIDA,
                     new DefaultLinkDescription(src, dst, INDIRECT, A1));
-        verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, bcast);
+        verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, message, subject, encoder);
 
         assertNotNull("Ancillary only link is ignored", event);
 
         // add Primary link
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event2 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, INDIRECT, A2));
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
         assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
         assertEquals(LINK_UPDATED, event2.type());
 
         // update link type
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event3 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT, A2));
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
         assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
@@ -466,38 +463,38 @@
 
 
         // no change
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event4 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT));
-        verifyNoBroadcastMessage(bcast);
+        verifyNoBroadcastMessage(message);
 
         assertNull("No change event expected", event4);
 
         // update link annotation (Primary)
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event5 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT, A2_2));
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
         assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
         assertEquals(LINK_UPDATED, event5.type());
 
         // update link annotation (Ancillary)
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
                 new DefaultLinkDescription(src, dst, DIRECT, A1_2));
-        verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, bcast);
+        verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
         assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
         assertEquals(LINK_UPDATED, event6.type());
 
         // update link type (Ancillary) : ignored
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
                 new DefaultLinkDescription(src, dst, EDGE));
-        verifyNoBroadcastMessage(bcast);
+        verifyNoBroadcastMessage(message);
         assertNull("Ancillary change other than annotation is ignored", event7);
     }
 
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/DecodeTo.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/DecodeTo.java
deleted file mode 100644
index ccbfaa8..0000000
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/DecodeTo.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.serializers;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Function;
-
-/**
- * Function to convert byte[] into {@code T}.
- *
- * @param <T> Type after decoding
- */
-public final class DecodeTo<T> implements Function<byte[], T> {
-
-    private StoreSerializer serializer;
-
-    public DecodeTo(StoreSerializer serializer) {
-        this.serializer = checkNotNull(serializer);
-    }
-
-    @Override
-    public T apply(byte[] input) {
-        return serializer.decode(input);
-    }
-}
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 01dda9e..62330fb 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -15,10 +15,10 @@
  */
 package org.onlab.util;
 
-import com.google.common.base.Strings;
-import com.google.common.primitives.UnsignedLongs;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
+import static java.nio.file.Files.delete;
+import static java.nio.file.Files.walkFileTree;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -37,12 +37,17 @@
 import java.util.Collection;
 import java.util.Dictionary;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-import static java.nio.file.Files.delete;
-import static java.nio.file.Files.walkFileTree;
-import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
-import static org.slf4j.LoggerFactory.getLogger;
+import org.slf4j.Logger;
+
+import com.google.common.base.Strings;
+import com.google.common.primitives.UnsignedLongs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Miscellaneous utility methods.
@@ -324,6 +329,51 @@
                                          dst.getAbsolutePath()));
     }
 
+    /**
+     * Returns the future value when complete or if future
+     * completes exceptionally returns the defaultValue.
+     * @param future future
+     * @param defaultValue default value
+     * @param <T> future value type
+     * @return future value when complete or if future
+     * completes exceptionally returns the defaultValue.
+     */
+    public static <T> T futureGetOrElse(Future<T> future, T defaultValue) {
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return defaultValue;
+        } catch (ExecutionException e) {
+            return defaultValue;
+        }
+    }
+
+    /**
+     * Returns the future value when complete or if future
+     * completes exceptionally returns the defaultValue.
+     * @param future future
+     * @param timeout time to wait for successful completion
+     * @param timeUnit time unit
+     * @param defaultValue default value
+     * @param <T> future value type
+     * @return future value when complete or if future
+     * completes exceptionally returns the defaultValue.
+     */
+    public static <T> T futureGetOrElse(Future<T> future,
+                                        long timeout,
+                                        TimeUnit timeUnit,
+                                        T defaultValue) {
+        try {
+            return future.get(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return defaultValue;
+        } catch (ExecutionException | TimeoutException e) {
+            return defaultValue;
+        }
+    }
+
     // Auxiliary path visitor for recursive directory structure copying.
     private static class DirectoryCopier extends SimpleFileVisitor<Path> {
         private Path src;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index 12e1d87..d7823d4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -16,10 +16,9 @@
 package org.onlab.netty;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 /**
  * Interface for low level messaging primitives.
  */
@@ -40,9 +39,8 @@
      * @param type type of message.
      * @param payload message payload.
      * @return a response future
-     * @throws IOException when I/O exception of some sort has occurred
      */
-    public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
+    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
 
     /**
      * Registers a new message handler for message type.
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 099880d..f96bb0b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -39,6 +39,7 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -56,8 +57,6 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * A Netty based implementation of MessagingService.
@@ -69,14 +68,14 @@
     private final Endpoint localEp;
     private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
-    private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+    private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .maximumSize(100000)
             .expireAfterWrite(10, TimeUnit.SECONDS)
-            .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
+            .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
                 @Override
-                public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
+                public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
                     if (entry.wasEvicted()) {
-                        entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
+                        entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
                     }
                 }
             })
@@ -178,11 +177,10 @@
     }
 
     @Override
-    public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
-            throws IOException {
-        SettableFuture<byte[]> futureResponse = SettableFuture.create();
+    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+        CompletableFuture<byte[]> response = new CompletableFuture<>();
         Long messageId = messageIdGenerator.incrementAndGet();
-        responseFutures.put(messageId, futureResponse);
+        responseFutures.put(messageId, response);
         InternalMessage message = new InternalMessage.Builder(this)
             .withId(messageId)
             .withSender(localEp)
@@ -193,9 +191,9 @@
             sendAsync(ep, message);
         } catch (Exception e) {
             responseFutures.invalidate(messageId);
-            throw e;
+            response.completeExceptionally(e);
         }
-        return futureResponse;
+        return response;
     }
 
     @Override
@@ -333,10 +331,10 @@
         String type = message.type();
         if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
             try {
-                SettableFuture<byte[]> futureResponse =
+                CompletableFuture<byte[]> futureResponse =
                     NettyMessagingService.this.responseFutures.getIfPresent(message.id());
                 if (futureResponse != null) {
-                    futureResponse.set(message.payload());
+                    futureResponse.complete(message.payload());
                 } else {
                     log.warn("Received a reply for message id:[{}]. "
                             + " from {}. But was unable to locate the"