Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index 7054bd3..f271de4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -65,6 +65,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static java.util.Arrays.asList;
 import static org.easymock.EasyMock.*;
@@ -181,8 +182,9 @@
                 new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
                         HW, swVersion, SN, CID, annotations);
         reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-            .andReturn(true).anyTimes();
+        clusterCommunicator.<InternalDeviceEvent>broadcast(
+                anyObject(InternalDeviceEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
         deviceStore.createOrUpdateDevice(PID, deviceId, description);
         verify(clusterCommunicator);
@@ -296,36 +298,43 @@
     }
 
     private void assertInternalDeviceEvent(NodeId sender,
-                                           DeviceId deviceId,
-                                           ProviderId providerId,
-                                           DeviceDescription expectedDesc,
-                                           Capture<ClusterMessage> actualMsg) {
-        assertTrue(actualMsg.hasCaptured());
-        assertEquals(sender, actualMsg.getValue().sender());
+            DeviceId deviceId,
+            ProviderId providerId,
+            DeviceDescription expectedDesc,
+            Capture<InternalDeviceEvent> actualEvent,
+            Capture<MessageSubject> actualSubject,
+            Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
+        assertTrue(actualEvent.hasCaptured());
+        assertTrue(actualSubject.hasCaptured());
+        assertTrue(actualEncoder.hasCaptured());
+
         assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
-                     actualMsg.getValue().subject());
-        InternalDeviceEvent addEvent
-            = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
-        assertEquals(deviceId, addEvent.deviceId());
-        assertEquals(providerId, addEvent.providerId());
-        assertDeviceDescriptionEquals(expectedDesc, addEvent.deviceDescription().value());
+                actualSubject.getValue());
+        assertEquals(deviceId, actualEvent.getValue().deviceId());
+        assertEquals(providerId, actualEvent.getValue().providerId());
+        assertDeviceDescriptionEquals(expectedDesc, actualEvent.getValue().deviceDescription().value());
     }
 
     private void assertInternalDeviceEvent(NodeId sender,
-                                           DeviceId deviceId,
-                                           ProviderId providerId,
-                                           DeviceDescription expectedDesc,
-                                           List<SparseAnnotations> expectedAnnotations,
-                                           Capture<ClusterMessage> actualMsg) {
-        assertTrue(actualMsg.hasCaptured());
-        assertEquals(sender, actualMsg.getValue().sender());
+            DeviceId deviceId,
+            ProviderId providerId,
+            DeviceDescription expectedDesc,
+            List<SparseAnnotations> expectedAnnotations,
+            Capture<InternalDeviceEvent> actualEvent,
+            Capture<MessageSubject> actualSubject,
+            Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
+        assertTrue(actualEvent.hasCaptured());
+        assertTrue(actualSubject.hasCaptured());
+        assertTrue(actualEncoder.hasCaptured());
+
         assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
-                actualMsg.getValue().subject());
-        InternalDeviceEvent addEvent
-            = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
-        assertEquals(deviceId, addEvent.deviceId());
-        assertEquals(providerId, addEvent.providerId());
-        assertDeviceDescriptionEquals(expectedDesc, expectedAnnotations, addEvent.deviceDescription().value());
+                actualSubject.getValue());
+        assertEquals(deviceId, actualEvent.getValue().deviceId());
+        assertEquals(providerId, actualEvent.getValue().providerId());
+        assertDeviceDescriptionEquals(
+                expectedDesc,
+                expectedAnnotations,
+                actualEvent.getValue().deviceDescription().value());
     }
 
     @Test
@@ -333,26 +342,28 @@
         DeviceDescription description =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW1, SN, CID);
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
         assertEquals(DEVICE_ADDED, event.type());
         assertDevice(DID1, SW1, event.subject());
         verify(clusterCommunicator);
-        assertInternalDeviceEvent(NID1, DID1, PID, description, bcast);
+        assertInternalDeviceEvent(NID1, DID1, PID, description, message, subject, encoder);
 
 
         DeviceDescription description2 =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW2, SN, CID);
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
         assertEquals(DEVICE_UPDATED, event2.type());
         assertDevice(DID1, SW2, event2.subject());
 
         verify(clusterCommunicator);
-        assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+        assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
         reset(clusterCommunicator);
 
         assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
@@ -366,7 +377,11 @@
                         HW, SW1, SN, CID, A2);
         Capture<ClusterMessage> bcast = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
+
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
         assertEquals(DEVICE_ADDED, event.type());
         assertDevice(DID1, SW1, event.subject());
@@ -374,13 +389,13 @@
         assertAnnotationsEquals(event.subject().annotations(), A2);
         assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
         verify(clusterCommunicator);
-        assertInternalDeviceEvent(NID1, DID1, PIDA, description, bcast);
+        assertInternalDeviceEvent(NID1, DID1, PIDA, description, message, subject, encoder);
 
         // update from primary
         DeviceDescription description2 =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW2, SN, CID, A1);
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
 
         DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
         assertEquals(DEVICE_UPDATED, event2.type());
@@ -389,17 +404,17 @@
         assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
         assertTrue(deviceStore.isAvailable(DID1));
         verify(clusterCommunicator);
-        assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+        assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
 
         // no-op update from primary
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
         assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
 
         verify(clusterCommunicator);
         assertFalse("no broadcast expected", bcast.hasCaptured());
 
         // For now, Ancillary is ignored once primary appears
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
 
         assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
 
@@ -410,7 +425,7 @@
         DeviceDescription description3 =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW1, SN, CID, A2_2);
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
 
         DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
         assertEquals(DEVICE_UPDATED, event3.type());
@@ -423,7 +438,7 @@
         verify(clusterCommunicator);
         // note: only annotation from PIDA is sent over the wire
         assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
-                                  asList(union(A2, A2_2)), bcast);
+                                  asList(union(A2, A2_2)), message, subject, encoder);
 
     }
 
@@ -434,23 +449,25 @@
         putDevice(DID1, SW1);
         assertTrue(deviceStore.isAvailable(DID1));
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         DeviceEvent event = deviceStore.markOffline(DID1);
         assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
         assertDevice(DID1, SW1, event.subject());
         assertFalse(deviceStore.isAvailable(DID1));
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
 
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
         DeviceEvent event2 = deviceStore.markOffline(DID1);
         assertNull("No change, no event", event2);
         verify(clusterCommunicator);
-        assertFalse(bcast.hasCaptured());
+        assertFalse(message.hasCaptured());
     }
 
     @Test
@@ -460,13 +477,15 @@
                 new DefaultPortDescription(P1, true),
                 new DefaultPortDescription(P2, true)
                 );
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
         Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
         for (DeviceEvent event : events) {
@@ -485,11 +504,11 @@
                 new DefaultPortDescription(P3, true)
                 );
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         events = deviceStore.updatePorts(PID, DID1, pds2);
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
         assertFalse("event should be triggered", events.isEmpty());
         for (DeviceEvent event : events) {
@@ -513,11 +532,11 @@
                 new DefaultPortDescription(P1, false),
                 new DefaultPortDescription(P2, true)
                 );
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         events = deviceStore.updatePorts(PID, DID1, pds3);
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
         assertFalse("event should be triggered", events.isEmpty());
         for (DeviceEvent event : events) {
@@ -544,9 +563,11 @@
                 );
         deviceStore.updatePorts(PID, DID1, pds);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalPortStatusEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
         DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
         assertEquals(PORT_UPDATED, event.type());
@@ -554,8 +575,8 @@
         assertEquals(P1, event.port().number());
         assertFalse("Port is disabled", event.port().isEnabled());
         verify(clusterCommunicator);
-        assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, bcast);
-        assertTrue(bcast.hasCaptured());
+        assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, message, subject, encoder);
+        assertTrue(message.hasCaptured());
     }
 
     @Test
@@ -567,11 +588,13 @@
                 );
         deviceStore.updatePorts(PID, DID1, pds);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
-
+        Capture<InternalPortStatusEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
 
         // update port from primary
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
+
         final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
         DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
         assertEquals(PORT_UPDATED, event.type());
@@ -580,19 +603,19 @@
         assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
         assertFalse("Port is disabled", event.port().isEnabled());
         verify(clusterCommunicator);
-        assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), bcast);
-        assertTrue(bcast.hasCaptured());
+        assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), message, subject, encoder);
+        assertTrue(message.hasCaptured());
 
         // update port from ancillary with no attributes
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
         final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
         DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
         assertNull("Ancillary is ignored if primary exists", event2);
         verify(clusterCommunicator);
-        assertFalse(bcast.hasCaptured());
+        assertFalse(message.hasCaptured());
 
         // but, Ancillary annotation update will be notified
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
         DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
         assertEquals(PORT_UPDATED, event3.type());
@@ -601,11 +624,11 @@
         assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
         assertFalse("Port is disabled", event3.port().isEnabled());
         verify(clusterCommunicator);
-        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), bcast);
-        assertTrue(bcast.hasCaptured());
+        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), message, subject, encoder);
+        assertTrue(message.hasCaptured());
 
         // port only reported from Ancillary will be notified as down
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
         DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
         assertEquals(PORT_ADDED, event4.type());
@@ -616,25 +639,29 @@
                         event4.port().isEnabled());
         verify(clusterCommunicator);
         // TODO: verify broadcast message content
-        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, bcast);
-        assertTrue(bcast.hasCaptured());
+        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, message, subject, encoder);
+        assertTrue(message.hasCaptured());
     }
 
-    private void assertInternalPortStatusEvent(NodeId sender, DeviceId did,
-            ProviderId pid, DefaultPortDescription expectedDesc,
-            List<SparseAnnotations> expectedAnnotations, Capture<ClusterMessage> actualMsg) {
+    private void assertInternalPortStatusEvent(NodeId sender,
+            DeviceId did,
+            ProviderId pid,
+            DefaultPortDescription expectedDesc,
+            List<SparseAnnotations> expectedAnnotations,
+            Capture<InternalPortStatusEvent> actualEvent,
+            Capture<MessageSubject> actualSubject,
+            Capture<Function<InternalPortStatusEvent, byte[]>> actualEncoder) {
 
-        assertTrue(actualMsg.hasCaptured());
-        assertEquals(sender, actualMsg.getValue().sender());
+        assertTrue(actualEvent.hasCaptured());
+        assertTrue(actualSubject.hasCaptured());
+        assertTrue(actualEncoder.hasCaptured());
+
         assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
-                actualMsg.getValue().subject());
-        InternalPortStatusEvent addEvent
-            = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
-        assertEquals(did, addEvent.deviceId());
-        assertEquals(pid, addEvent.providerId());
+                actualSubject.getValue());
+        assertEquals(did, actualEvent.getValue().deviceId());
+        assertEquals(pid, actualEvent.getValue().providerId());
         assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
-                addEvent.portDescription().value());
-
+                actualEvent.getValue().portDescription().value());
     }
 
     private void assertPortDescriptionEquals(
@@ -649,19 +676,31 @@
                          expectedAnnotations.toArray(new SparseAnnotations[0]));
     }
 
-    private void resetCommunicatorExpectingNoBroadcast(
-            Capture<ClusterMessage> bcast) {
-        bcast.reset();
+    private <T> void resetCommunicatorExpectingNoBroadcast(
+            Capture<T> message,
+            Capture<MessageSubject> subject,
+            Capture<Function<T, byte[]>> encoder) {
+        message.reset();
+        subject.reset();
+        encoder.reset();
         reset(clusterCommunicator);
         replay(clusterCommunicator);
     }
 
-    private void resetCommunicatorExpectingSingleBroadcast(
-            Capture<ClusterMessage> bcast) {
+    private <T> void resetCommunicatorExpectingSingleBroadcast(
+            Capture<T> message,
+            Capture<MessageSubject> subject,
+            Capture<Function<T, byte[]>> encoder) {
 
-        bcast.reset();
+        message.reset();
+        subject.reset();
+        encoder.reset();
         reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+        clusterCommunicator.broadcast(
+                    capture(message),
+                    capture(subject),
+                    capture(encoder));
+        expectLastCall().once();
         replay(clusterCommunicator);
     }
 
@@ -724,9 +763,11 @@
         assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
         assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalDeviceEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
 
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
 
         DeviceEvent event = deviceStore.removeDevice(DID1);
         assertEquals(DEVICE_REMOVED, event.type());
@@ -736,7 +777,7 @@
         assertEquals(0, deviceStore.getPorts(DID1).size());
         verify(clusterCommunicator);
         // TODO: verify broadcast message
-        assertTrue(bcast.hasCaptured());
+        assertTrue(message.hasCaptured());
 
         // putBack Device, Port w/o annotation
         putDevice(DID1, SW1);
@@ -825,10 +866,6 @@
             this.clusterService = clusterService;
             this.clusterCommunicator = clusterCommunicator;
         }
-
-        public <T> T deserialize(byte[] bytes) {
-            return SERIALIZER.decode(bytes);
-        }
     }
 
     private static final class TestClusterService extends StaticClusterService {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 0a3d5e6..8d495eb 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -30,6 +30,7 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractEvent;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -44,17 +45,20 @@
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static junit.framework.TestCase.assertFalse;
@@ -281,7 +285,7 @@
 
         // Set up expected internal message to be broadcast to peers on first put
         expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
-                .peekAtNextTimestamp()), clusterCommunicator);
+                .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         // Put first value
         assertNull(ecMap.get(KEY1));
@@ -292,7 +296,7 @@
 
         // Set up expected internal message to be broadcast to peers on second put
         expectSpecificMulticastMessage(generatePutMessage(
-                KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
+                KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         // Update same key to a new value
         ecMap.put(KEY1, VALUE2);
@@ -341,7 +345,7 @@
         // Remove the value and check the correct internal cluster messages
         // are sent
         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                                       clusterCommunicator);
+                UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -352,7 +356,7 @@
         // the map, we expect that the tombstone is updated and another remove
         // event is sent to the cluster and external listeners.
         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                                       clusterCommunicator);
+                UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -402,7 +406,7 @@
         ecMap.addListener(listener);
 
         // Expect a multi-update inter-instance message
-        expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+        expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
                                        clusterCommunicator);
 
         Map<String, String> putAllValues = new HashMap<>();
@@ -441,7 +445,7 @@
         ecMap.put(KEY2, VALUE2);
 
         ecMap.addListener(listener);
-        expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+        expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         ecMap.clear();
 
@@ -605,7 +609,8 @@
                 SERIALIZER.encode(Lists.newArrayList(event)));
     }
 
-    private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
+    private List<PutEntry<String, String>> generatePutMessage(
+            String key1, String value1, String key2, String value2) {
         ArrayList<PutEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
@@ -617,10 +622,7 @@
         list.add(pe1);
         list.add(pe2);
 
-
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(list));
+        return list;
     }
 
     private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
@@ -631,7 +633,7 @@
                 SERIALIZER.encode(Lists.newArrayList(event)));
     }
 
-    private ClusterMessage generateRemoveMessage(String key1, String key2) {
+    private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
         ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
@@ -643,9 +645,7 @@
         list.add(re1);
         list.add(re2);
 
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(list));
+        return list;
     }
 
     /**
@@ -656,13 +656,13 @@
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
     //FIXME rename
-    private static void expectSpecificBroadcastMessage(ClusterMessage m,
-                           ClusterCommunicationService clusterCommunicator) {
+    private static <T> void expectSpecificBroadcastMessage(
+            T message,
+            MessageSubject subject,
+            ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-//        expect(clusterCommunicator.broadcast(m)).andReturn(true);
-        expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
-                .andReturn(true)
-                .anyTimes();
+        clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -670,17 +670,16 @@
      * Sets up a mock ClusterCommunicationService to expect a specific cluster
      * message to be multicast to the cluster.
      *
-     * @param m message we expect to be sent
+     * @param message message we expect to be sent
+     * @param subject subject we expect to be sent to
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
     //FIXME rename
-    private static void expectSpecificMulticastMessage(ClusterMessage m,
+    private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
                            ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-//        expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
-        expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
-                .andReturn(true)
-                .anyTimes();
+        clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -693,12 +692,15 @@
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
     //FIXME rename
-    private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
+    private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
 //        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
 //                                             anyObject(Iterable.class)))
-        expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
-                                           anyObject(NodeId.class)))
+        expect(clusterCommunicator.<T>unicast(
+                    anyObject(),
+                    anyObject(MessageSubject.class),
+                    anyObject(Function.class),
+                    anyObject(NodeId.class)))
                 .andReturn(true)
                 .anyTimes();
         replay(clusterCommunicator);
@@ -711,15 +713,14 @@
      *
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
-    //FIXME rename
     private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-//        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-//                .andReturn(true)
-//                .anyTimes();
-        expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
-                .andReturn(true)
-                .anyTimes();
+        clusterCommunicator.<AbstractEvent>multicast(
+                anyObject(AbstractEvent.class),
+                anyObject(MessageSubject.class),
+                anyObject(Function.class),
+                anyObject(Set.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -733,45 +734,6 @@
             implements ClusterCommunicationService {
 
         @Override
-        public boolean broadcast(ClusterMessage message) {
-            return false;
-        }
-
-        @Override
-        public boolean broadcastIncludeSelf(ClusterMessage message) {
-            return false;
-        }
-
-        @Override
-        public boolean unicast(ClusterMessage message, NodeId toNodeId)  {
-            return false;
-        }
-
-        @Override
-        public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
-            return false;
-        }
-
-        @Override
-        public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
-                                                       NodeId toNodeId)
-                throws IOException {
-            return null;
-        }
-
-        @Override
-        public void addSubscriber(MessageSubject subject,
-                                  ClusterMessageHandler subscriber) {
-            if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
-                updateHandler = subscriber;
-            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
-                antiEntropyHandler = subscriber;
-            } else {
-                throw new RuntimeException("Unexpected message subject " + subject.toString());
-            }
-        }
-
-        @Override
         public void addSubscriber(MessageSubject subject,
                                   ClusterMessageHandler subscriber,
                                   ExecutorService executor) {
@@ -786,6 +748,73 @@
 
         @Override
         public void removeSubscriber(MessageSubject subject) {}
+
+        @Override
+        public <M> void broadcast(M message, MessageSubject subject,
+                Function<M, byte[]> encoder) {
+        }
+
+        @Override
+        public <M> void broadcastIncludeSelf(M message,
+                MessageSubject subject, Function<M, byte[]> encoder) {
+        }
+
+        @Override
+        public <M> boolean unicast(M message, MessageSubject subject,
+                Function<M, byte[]> encoder, NodeId toNodeId) {
+            return false;
+        }
+
+        @Override
+        public <M> void multicast(M message, MessageSubject subject,
+                Function<M, byte[]> encoder, Set<NodeId> nodes) {
+        }
+
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message,
+                MessageSubject subject, Function<M, byte[]> encoder,
+                Function<byte[], R> decoder, NodeId toNodeId) {
+            return null;
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject,
+                Function<byte[], M> decoder, Function<M, R> handler,
+                Function<R, byte[]> encoder, ExecutorService executor) {
+        }
+
+        @Override
+        public <M> void addSubscriber(MessageSubject subject,
+                Function<byte[], M> decoder, Consumer<M> handler,
+                ExecutorService executor) {
+        }
+
+        @Override
+        public boolean broadcast(ClusterMessage message) {
+            return false;
+        }
+
+        @Override
+        public boolean broadcastIncludeSelf(ClusterMessage message) {
+            return false;
+        }
+
+        @Override
+        public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+            return false;
+        }
+
+        @Override
+        public boolean multicast(ClusterMessage message,
+                Iterable<NodeId> nodeIds) {
+            return false;
+        }
+
+        @Override
+        public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
+                NodeId toNodeId) {
+            return null;
+        }
     }
 
     /**
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
index 03e82bc..b0da337 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
@@ -28,7 +28,6 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.net.ConnectPoint;
@@ -48,7 +47,6 @@
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.store.cluster.StaticClusterService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.device.impl.DeviceClockManager;
@@ -59,6 +57,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static org.easymock.EasyMock.*;
 import static org.junit.Assert.*;
@@ -119,7 +118,6 @@
     private DeviceClockManager deviceClockManager;
     private DeviceClockService deviceClockService;
     private ClusterCommunicationService clusterCommunicator;
-    private MastershipService mastershipService;
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
@@ -171,26 +169,24 @@
         ConnectPoint src = new ConnectPoint(srcId, srcNum);
         ConnectPoint dst = new ConnectPoint(dstId, dstNum);
         reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-            .andReturn(true).anyTimes();
+        clusterCommunicator.<InternalLinkEvent>broadcast(
+                anyObject(InternalLinkEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
         linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
         verify(clusterCommunicator);
     }
 
-    private void resetCommunicatorExpectingNoBroadcast(
-                                                       Capture<ClusterMessage> bcast) {
-        bcast.reset();
+    private <T> void resetCommunicatorExpectingSingleBroadcast(
+            Capture<T> message,
+            Capture<MessageSubject> subject,
+            Capture<Function<T, byte[]>> encoder) {
+        message.reset();
+        subject.reset();
+        encoder.reset();
         reset(clusterCommunicator);
-        replay(clusterCommunicator);
-    }
-
-    private void resetCommunicatorExpectingSingleBroadcast(
-                                                           Capture<ClusterMessage> bcast) {
-
-        bcast.reset();
-        reset(clusterCommunicator);
-        expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+        clusterCommunicator.broadcast(capture(message), capture(subject), capture(encoder));
+        expectLastCall().once();
         replay(clusterCommunicator);
     }
 
@@ -367,56 +363,55 @@
         ConnectPoint src = new ConnectPoint(DID1, P1);
         ConnectPoint dst = new ConnectPoint(DID2, P2);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalLinkEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
 
         // add link
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         final DefaultLinkDescription linkDescription = new DefaultLinkDescription(src, dst, INDIRECT);
         LinkEvent event = linkStore.createOrUpdateLink(PID,
                     linkDescription);
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject());
         assertEquals(LINK_ADDED, event.type());
 
         // update link type
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event2 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT));
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject());
         assertEquals(LINK_UPDATED, event2.type());
 
         // no change
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event3 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT));
-        verifyNoBroadcastMessage(bcast);
+        verifyNoBroadcastMessage(message);
 
         assertNull("No change event expected", event3);
     }
 
-    private void verifyNoBroadcastMessage(Capture<ClusterMessage> bcast) {
-        assertFalse("No broadcast expected", bcast.hasCaptured());
+    private <T> void verifyNoBroadcastMessage(Capture<T> message) {
+        assertFalse("No broadcast expected", message.hasCaptured());
     }
 
     private void verifyLinkBroadcastMessage(ProviderId providerId,
-                                            NodeId sender,
-                                            ConnectPoint src,
-                                            ConnectPoint dst,
-                                            Type type,
-                                            Capture<ClusterMessage> actualMsg) {
+            NodeId sender,
+            ConnectPoint src,
+            ConnectPoint dst,
+            Type type,
+            Capture<InternalLinkEvent> actualLinkEvent,
+            Capture<MessageSubject> actualSubject,
+            Capture<Function<InternalLinkEvent, byte[]>> actualEncoder) {
         verify(clusterCommunicator);
-        assertTrue(actualMsg.hasCaptured());
-        assertEquals(sender, actualMsg.getValue().sender());
-        assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE,
-                     actualMsg.getValue().subject());
-        InternalLinkEvent linkEvent
-            = GossipLinkStore.SERIALIZER.decode(actualMsg.getValue().payload());
-        assertEquals(providerId, linkEvent.providerId());
-        assertLinkDescriptionEquals(src, dst, type, linkEvent.linkDescription().value());
-
+        assertTrue(actualLinkEvent.hasCaptured());
+        assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE, actualSubject.getValue());
+        assertEquals(providerId, actualLinkEvent.getValue().providerId());
+        assertLinkDescriptionEquals(src, dst, type, actualLinkEvent.getValue().linkDescription().value());
     }
 
     private static void assertLinkDescriptionEquals(ConnectPoint src,
@@ -434,31 +429,33 @@
         ConnectPoint src = new ConnectPoint(DID1, P1);
         ConnectPoint dst = new ConnectPoint(DID2, P2);
 
-        Capture<ClusterMessage> bcast = new Capture<>();
+        Capture<InternalLinkEvent> message = new Capture<>();
+        Capture<MessageSubject> subject = new Capture<>();
+        Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
 
         // add Ancillary link
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event = linkStore.createOrUpdateLink(PIDA,
                     new DefaultLinkDescription(src, dst, INDIRECT, A1));
-        verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, bcast);
+        verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, message, subject, encoder);
 
         assertNotNull("Ancillary only link is ignored", event);
 
         // add Primary link
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event2 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, INDIRECT, A2));
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
         assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
         assertEquals(LINK_UPDATED, event2.type());
 
         // update link type
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event3 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT, A2));
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
         assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
@@ -466,38 +463,38 @@
 
 
         // no change
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event4 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT));
-        verifyNoBroadcastMessage(bcast);
+        verifyNoBroadcastMessage(message);
 
         assertNull("No change event expected", event4);
 
         // update link annotation (Primary)
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event5 = linkStore.createOrUpdateLink(PID,
                 new DefaultLinkDescription(src, dst, DIRECT, A2_2));
-        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+        verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
         assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
         assertEquals(LINK_UPDATED, event5.type());
 
         // update link annotation (Ancillary)
-        resetCommunicatorExpectingSingleBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
                 new DefaultLinkDescription(src, dst, DIRECT, A1_2));
-        verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, bcast);
+        verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, message, subject, encoder);
 
         assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
         assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
         assertEquals(LINK_UPDATED, event6.type());
 
         // update link type (Ancillary) : ignored
-        resetCommunicatorExpectingNoBroadcast(bcast);
+        resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
         LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
                 new DefaultLinkDescription(src, dst, EDGE));
-        verifyNoBroadcastMessage(bcast);
+        verifyNoBroadcastMessage(message);
         assertNull("Ancillary change other than annotation is ignored", event7);
     }