Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 0a3d5e6..8d495eb 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -30,6 +30,7 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractEvent;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -44,17 +45,20 @@
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static junit.framework.TestCase.assertFalse;
@@ -281,7 +285,7 @@
 
         // Set up expected internal message to be broadcast to peers on first put
         expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
-                .peekAtNextTimestamp()), clusterCommunicator);
+                .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         // Put first value
         assertNull(ecMap.get(KEY1));
@@ -292,7 +296,7 @@
 
         // Set up expected internal message to be broadcast to peers on second put
         expectSpecificMulticastMessage(generatePutMessage(
-                KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
+                KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         // Update same key to a new value
         ecMap.put(KEY1, VALUE2);
@@ -341,7 +345,7 @@
         // Remove the value and check the correct internal cluster messages
         // are sent
         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                                       clusterCommunicator);
+                UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -352,7 +356,7 @@
         // the map, we expect that the tombstone is updated and another remove
         // event is sent to the cluster and external listeners.
         expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                                       clusterCommunicator);
+                UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -402,7 +406,7 @@
         ecMap.addListener(listener);
 
         // Expect a multi-update inter-instance message
-        expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+        expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
                                        clusterCommunicator);
 
         Map<String, String> putAllValues = new HashMap<>();
@@ -441,7 +445,7 @@
         ecMap.put(KEY2, VALUE2);
 
         ecMap.addListener(listener);
-        expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+        expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
 
         ecMap.clear();
 
@@ -605,7 +609,8 @@
                 SERIALIZER.encode(Lists.newArrayList(event)));
     }
 
-    private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
+    private List<PutEntry<String, String>> generatePutMessage(
+            String key1, String value1, String key2, String value2) {
         ArrayList<PutEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
@@ -617,10 +622,7 @@
         list.add(pe1);
         list.add(pe2);
 
-
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(list));
+        return list;
     }
 
     private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
@@ -631,7 +633,7 @@
                 SERIALIZER.encode(Lists.newArrayList(event)));
     }
 
-    private ClusterMessage generateRemoveMessage(String key1, String key2) {
+    private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
         ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
 
         Timestamp timestamp1 = clockService.peek(1);
@@ -643,9 +645,7 @@
         list.add(re1);
         list.add(re2);
 
-        return new ClusterMessage(
-                clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
-                SERIALIZER.encode(list));
+        return list;
     }
 
     /**
@@ -656,13 +656,13 @@
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
     //FIXME rename
-    private static void expectSpecificBroadcastMessage(ClusterMessage m,
-                           ClusterCommunicationService clusterCommunicator) {
+    private static <T> void expectSpecificBroadcastMessage(
+            T message,
+            MessageSubject subject,
+            ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-//        expect(clusterCommunicator.broadcast(m)).andReturn(true);
-        expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
-                .andReturn(true)
-                .anyTimes();
+        clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -670,17 +670,16 @@
      * Sets up a mock ClusterCommunicationService to expect a specific cluster
      * message to be multicast to the cluster.
      *
-     * @param m message we expect to be sent
+     * @param message message we expect to be sent
+     * @param subject subject we expect to be sent to
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
     //FIXME rename
-    private static void expectSpecificMulticastMessage(ClusterMessage m,
+    private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
                            ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-//        expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
-        expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
-                .andReturn(true)
-                .anyTimes();
+        clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -693,12 +692,15 @@
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
     //FIXME rename
-    private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
+    private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
 //        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
 //                                             anyObject(Iterable.class)))
-        expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
-                                           anyObject(NodeId.class)))
+        expect(clusterCommunicator.<T>unicast(
+                    anyObject(),
+                    anyObject(MessageSubject.class),
+                    anyObject(Function.class),
+                    anyObject(NodeId.class)))
                 .andReturn(true)
                 .anyTimes();
         replay(clusterCommunicator);
@@ -711,15 +713,14 @@
      *
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
-    //FIXME rename
     private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
-//        expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-//                .andReturn(true)
-//                .anyTimes();
-        expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
-                .andReturn(true)
-                .anyTimes();
+        clusterCommunicator.<AbstractEvent>multicast(
+                anyObject(AbstractEvent.class),
+                anyObject(MessageSubject.class),
+                anyObject(Function.class),
+                anyObject(Set.class));
+        expectLastCall().anyTimes();
         replay(clusterCommunicator);
     }
 
@@ -733,45 +734,6 @@
             implements ClusterCommunicationService {
 
         @Override
-        public boolean broadcast(ClusterMessage message) {
-            return false;
-        }
-
-        @Override
-        public boolean broadcastIncludeSelf(ClusterMessage message) {
-            return false;
-        }
-
-        @Override
-        public boolean unicast(ClusterMessage message, NodeId toNodeId)  {
-            return false;
-        }
-
-        @Override
-        public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
-            return false;
-        }
-
-        @Override
-        public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
-                                                       NodeId toNodeId)
-                throws IOException {
-            return null;
-        }
-
-        @Override
-        public void addSubscriber(MessageSubject subject,
-                                  ClusterMessageHandler subscriber) {
-            if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
-                updateHandler = subscriber;
-            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
-                antiEntropyHandler = subscriber;
-            } else {
-                throw new RuntimeException("Unexpected message subject " + subject.toString());
-            }
-        }
-
-        @Override
         public void addSubscriber(MessageSubject subject,
                                   ClusterMessageHandler subscriber,
                                   ExecutorService executor) {
@@ -786,6 +748,73 @@
 
         @Override
         public void removeSubscriber(MessageSubject subject) {}
+
+        @Override
+        public <M> void broadcast(M message, MessageSubject subject,
+                Function<M, byte[]> encoder) {
+        }
+
+        @Override
+        public <M> void broadcastIncludeSelf(M message,
+                MessageSubject subject, Function<M, byte[]> encoder) {
+        }
+
+        @Override
+        public <M> boolean unicast(M message, MessageSubject subject,
+                Function<M, byte[]> encoder, NodeId toNodeId) {
+            return false;
+        }
+
+        @Override
+        public <M> void multicast(M message, MessageSubject subject,
+                Function<M, byte[]> encoder, Set<NodeId> nodes) {
+        }
+
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message,
+                MessageSubject subject, Function<M, byte[]> encoder,
+                Function<byte[], R> decoder, NodeId toNodeId) {
+            return null;
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject,
+                Function<byte[], M> decoder, Function<M, R> handler,
+                Function<R, byte[]> encoder, ExecutorService executor) {
+        }
+
+        @Override
+        public <M> void addSubscriber(MessageSubject subject,
+                Function<byte[], M> decoder, Consumer<M> handler,
+                ExecutorService executor) {
+        }
+
+        @Override
+        public boolean broadcast(ClusterMessage message) {
+            return false;
+        }
+
+        @Override
+        public boolean broadcastIncludeSelf(ClusterMessage message) {
+            return false;
+        }
+
+        @Override
+        public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+            return false;
+        }
+
+        @Override
+        public boolean multicast(ClusterMessage message,
+                Iterable<NodeId> nodeIds) {
+            return false;
+        }
+
+        @Override
+        public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
+                NodeId toNodeId) {
+            return null;
+        }
     }
 
     /**