Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 131000b..b47376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -304,11 +304,9 @@
 
             DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
                     providerId, deviceId, deviceDescription);
-            ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
-                    SERIALIZER.encode(deviceInjectedEvent));
 
             // TODO check unicast return value
-            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
             /* error log:
             log.warn("Failed to process injected device id: {} desc: {} " +
                             "(cluster messaging failed: {})",
@@ -555,11 +553,9 @@
             }
 
             PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
-            ClusterMessage clusterMessage = new ClusterMessage(
-                    localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
 
             //TODO check unicast return value
-            clusterCommunicator.unicast(clusterMessage, deviceNode);
+            clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
             /* error log:
             log.warn("Failed to process injected ports of device id: {} " +
                             "(cluster messaging failed: {})",
@@ -867,13 +863,8 @@
             log.debug("{} has control of {}, forwarding remove request",
                      master, deviceId);
 
-             ClusterMessage message = new ClusterMessage(
-                     myId,
-                     DEVICE_REMOVE_REQ,
-                     SERIALIZER.encode(deviceId));
-
             // TODO check unicast return value
-            clusterCommunicator.unicast(message, master);
+            clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
              /* error log:
              log.error("Failed to forward {} remove request to {}", deviceId, master, e);
              */
@@ -1057,19 +1048,11 @@
     }
 
     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.unicast(message, recipient);
+        clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
+        clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
     }
 
     private void notifyPeers(InternalDeviceEvent event) {