Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 0a3d5e6..8d495eb 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -30,6 +30,7 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractEvent;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -44,17 +45,20 @@
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
import static com.google.common.base.Preconditions.checkArgument;
import static junit.framework.TestCase.assertFalse;
@@ -281,7 +285,7 @@
// Set up expected internal message to be broadcast to peers on first put
expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
- .peekAtNextTimestamp()), clusterCommunicator);
+ .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
// Put first value
assertNull(ecMap.get(KEY1));
@@ -292,7 +296,7 @@
// Set up expected internal message to be broadcast to peers on second put
expectSpecificMulticastMessage(generatePutMessage(
- KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
+ KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
// Update same key to a new value
ecMap.put(KEY1, VALUE2);
@@ -341,7 +345,7 @@
// Remove the value and check the correct internal cluster messages
// are sent
expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- clusterCommunicator);
+ UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
ecMap.remove(KEY1);
assertNull(ecMap.get(KEY1));
@@ -352,7 +356,7 @@
// the map, we expect that the tombstone is updated and another remove
// event is sent to the cluster and external listeners.
expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- clusterCommunicator);
+ UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
ecMap.remove(KEY1);
assertNull(ecMap.get(KEY1));
@@ -402,7 +406,7 @@
ecMap.addListener(listener);
// Expect a multi-update inter-instance message
- expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+ expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
clusterCommunicator);
Map<String, String> putAllValues = new HashMap<>();
@@ -441,7 +445,7 @@
ecMap.put(KEY2, VALUE2);
ecMap.addListener(listener);
- expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+ expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
ecMap.clear();
@@ -605,7 +609,8 @@
SERIALIZER.encode(Lists.newArrayList(event)));
}
- private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
+ private List<PutEntry<String, String>> generatePutMessage(
+ String key1, String value1, String key2, String value2) {
ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
@@ -617,10 +622,7 @@
list.add(pe1);
list.add(pe2);
-
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(list));
+ return list;
}
private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
@@ -631,7 +633,7 @@
SERIALIZER.encode(Lists.newArrayList(event)));
}
- private ClusterMessage generateRemoveMessage(String key1, String key2) {
+ private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
@@ -643,9 +645,7 @@
list.add(re1);
list.add(re2);
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(list));
+ return list;
}
/**
@@ -656,13 +656,13 @@
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
//FIXME rename
- private static void expectSpecificBroadcastMessage(ClusterMessage m,
- ClusterCommunicationService clusterCommunicator) {
+ private static <T> void expectSpecificBroadcastMessage(
+ T message,
+ MessageSubject subject,
+ ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
-// expect(clusterCommunicator.broadcast(m)).andReturn(true);
- expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
- .andReturn(true)
- .anyTimes();
+ clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
}
@@ -670,17 +670,16 @@
* Sets up a mock ClusterCommunicationService to expect a specific cluster
* message to be multicast to the cluster.
*
- * @param m message we expect to be sent
+ * @param message message we expect to be sent
+ * @param subject subject we expect to be sent to
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
//FIXME rename
- private static void expectSpecificMulticastMessage(ClusterMessage m,
+ private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
-// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
- expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
- .andReturn(true)
- .anyTimes();
+ clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
}
@@ -693,12 +692,15 @@
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
//FIXME rename
- private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
+ private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
// anyObject(Iterable.class)))
- expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
- anyObject(NodeId.class)))
+ expect(clusterCommunicator.<T>unicast(
+ anyObject(),
+ anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(NodeId.class)))
.andReturn(true)
.anyTimes();
replay(clusterCommunicator);
@@ -711,15 +713,14 @@
*
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
- //FIXME rename
private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
-// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-// .andReturn(true)
-// .anyTimes();
- expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
- .andReturn(true)
- .anyTimes();
+ clusterCommunicator.<AbstractEvent>multicast(
+ anyObject(AbstractEvent.class),
+ anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(Set.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
}
@@ -733,45 +734,6 @@
implements ClusterCommunicationService {
@Override
- public boolean broadcast(ClusterMessage message) {
- return false;
- }
-
- @Override
- public boolean broadcastIncludeSelf(ClusterMessage message) {
- return false;
- }
-
- @Override
- public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- return false;
- }
-
- @Override
- public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
- return false;
- }
-
- @Override
- public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
- NodeId toNodeId)
- throws IOException {
- return null;
- }
-
- @Override
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber) {
- if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
- updateHandler = subscriber;
- } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
- antiEntropyHandler = subscriber;
- } else {
- throw new RuntimeException("Unexpected message subject " + subject.toString());
- }
- }
-
- @Override
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
@@ -786,6 +748,73 @@
@Override
public void removeSubscriber(MessageSubject subject) {}
+
+ @Override
+ public <M> void broadcast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject, Function<M, byte[]> encoder) {
+ }
+
+ @Override
+ public <M> boolean unicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, NodeId toNodeId) {
+ return false;
+ }
+
+ @Override
+ public <M> void multicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Set<NodeId> nodes) {
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject, Function<M, byte[]> encoder,
+ Function<byte[], R> decoder, NodeId toNodeId) {
+ return null;
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder, Function<M, R> handler,
+ Function<R, byte[]> encoder, ExecutorService executor) {
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder, Consumer<M> handler,
+ ExecutorService executor) {
+ }
+
+ @Override
+ public boolean broadcast(ClusterMessage message) {
+ return false;
+ }
+
+ @Override
+ public boolean broadcastIncludeSelf(ClusterMessage message) {
+ return false;
+ }
+
+ @Override
+ public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+ return false;
+ }
+
+ @Override
+ public boolean multicast(ClusterMessage message,
+ Iterable<NodeId> nodeIds) {
+ return false;
+ }
+
+ @Override
+ public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
+ NodeId toNodeId) {
+ return null;
+ }
}
/**