adding sender-side accumulator to ecmap
Change-Id: I63de27131c067c07b41ca311b14ef3ac85b6ae3e
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 5ed6384..f7c9323 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
@@ -67,10 +68,8 @@
private SequentialClockService<String, String> clockService;
private static final String MAP_NAME = "test";
- private static final MessageSubject PUT_MESSAGE_SUBJECT
+ private static final MessageSubject UPDATE_MESSAGE_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-update");
- private static final MessageSubject REMOVE_MESSAGE_SUBJECT
- = new MessageSubject("ecm-" + MAP_NAME + "-remove");
private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
@@ -82,8 +81,7 @@
private final ControllerNode self =
new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
- private ClusterMessageHandler putHandler;
- private ClusterMessageHandler removeHandler;
+ private ClusterMessageHandler updateHandler;
private ClusterMessageHandler antiEntropyHandler;
/*
@@ -105,8 +103,6 @@
.register(PutEntry.class)
.register(RemoveEntry.class)
.register(ArrayList.class)
- .register(InternalPutEvent.class)
- .register(InternalRemoveEvent.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
.build();
@@ -237,7 +233,7 @@
ecMap.addListener(new TestListener(latch));
assertNull(ecMap.get(KEY2));
- putHandler.handle(message);
+ updateHandler.handle(message);
assertTrue("External listener never got notified of internal event",
latch.await(100, TimeUnit.MILLISECONDS));
assertEquals(VALUE2, ecMap.get(KEY2));
@@ -254,7 +250,7 @@
latch = new CountDownLatch(1);
ecMap.addListener(new TestListener(latch));
- removeHandler.handle(removeMessage);
+ updateHandler.handle(removeMessage);
assertTrue("External listener never got notified of internal event",
latch.await(100, TimeUnit.MILLISECONDS));
assertNull(ecMap.get(KEY1));
@@ -568,8 +564,7 @@
@Test
public void testDestroy() throws Exception {
- clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT);
- clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
+ clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
replay(clusterCommunicator);
@@ -594,12 +589,11 @@
}
private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
- InternalPutEvent<String, String> event =
- new InternalPutEvent<>(key, value, timestamp);
+ PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
return new ClusterMessage(
- clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event));
+ clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+ SERIALIZER.encode(Lists.newArrayList(event)));
}
private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
@@ -614,38 +608,35 @@
list.add(pe1);
list.add(pe2);
- InternalPutEvent<String, String> event = new InternalPutEvent<>(list);
return new ClusterMessage(
- clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event));
+ clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+ SERIALIZER.encode(list));
}
private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
- InternalRemoveEvent<String> event = new InternalRemoveEvent<>(key, timestamp);
+ RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
return new ClusterMessage(
- clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
- SERIALIZER.encode(event));
+ clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+ SERIALIZER.encode(Lists.newArrayList(event)));
}
private ClusterMessage generateRemoveMessage(String key1, String key2) {
- ArrayList<RemoveEntry<String>> list = new ArrayList<>();
+ ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
Timestamp timestamp2 = clockService.peek(2);
- RemoveEntry<String> re1 = new RemoveEntry<>(key1, timestamp1);
- RemoveEntry<String> re2 = new RemoveEntry<>(key2, timestamp2);
+ RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
+ RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
list.add(re1);
list.add(re2);
- InternalRemoveEvent<String> event = new InternalRemoveEvent<>(list);
-
return new ClusterMessage(
- clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT,
- SERIALIZER.encode(event));
+ clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
+ SERIALIZER.encode(list));
}
/**
@@ -655,10 +646,14 @@
* @param m message we expect to be sent
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
+ //FIXME rename
private static void expectSpecificBroadcastMessage(ClusterMessage m,
ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(m)).andReturn(true);
+// expect(clusterCommunicator.broadcast(m)).andReturn(true);
+ expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
+ .andReturn(true)
+ .anyTimes();
replay(clusterCommunicator);
}
@@ -669,10 +664,14 @@
* @param m message we expect to be sent
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
+ //FIXME rename
private static void expectSpecificMulticastMessage(ClusterMessage m,
ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
- expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
+// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
+ expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
+ .andReturn(true)
+ .anyTimes();
replay(clusterCommunicator);
}
@@ -684,10 +683,13 @@
*
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
+ //FIXME rename
private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
- expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
- anyObject(Iterable.class)))
+// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
+// anyObject(Iterable.class)))
+ expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
+ anyObject(NodeId.class)))
.andReturn(true)
.anyTimes();
replay(clusterCommunicator);
@@ -700,9 +702,13 @@
*
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
+ //FIXME rename
private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
+// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
+// .andReturn(true)
+// .anyTimes();
+ expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
.andReturn(true)
.anyTimes();
replay(clusterCommunicator);
@@ -747,10 +753,8 @@
@Override
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber) {
- if (subject.equals(PUT_MESSAGE_SUBJECT)) {
- putHandler = subscriber;
- } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
- removeHandler = subscriber;
+ if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+ updateHandler = subscriber;
} else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
antiEntropyHandler = subscriber;
} else {
@@ -762,10 +766,8 @@
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
- if (subject.equals(PUT_MESSAGE_SUBJECT)) {
- putHandler = subscriber;
- } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
- removeHandler = subscriber;
+ if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+ updateHandler = subscriber;
} else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
antiEntropyHandler = subscriber;
} else {