Simplified ECMap implmentation by merging items and tombstones maps
Change-Id: If4253722d91c35a7e57dec3c2fceb216d14a7314
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 28be8dc..57943ad 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -16,8 +16,8 @@
package org.onosproject.store.ecmap;
import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
@@ -32,7 +32,6 @@
import org.onosproject.event.AbstractEvent;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.LogicalTimestamp;
@@ -44,11 +43,13 @@
import org.onosproject.store.service.EventuallyConsistentMapListener;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -89,8 +90,8 @@
private final ControllerNode self =
new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
- private ClusterMessageHandler updateHandler;
- private ClusterMessageHandler antiEntropyHandler;
+ private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
+ private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
/*
* Serialization is a bit tricky here. We need to serialize in the tests
@@ -109,11 +110,10 @@
// Below is the classes that the map internally registers
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
- .register(PutEntry.class)
- .register(RemoveEntry.class)
.register(ArrayList.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
+ .register(Optional.class)
.build();
}
};
@@ -131,9 +131,9 @@
// delegate to our ClusterCommunicationService implementation. This
// allows us to get a reference to the map's internal cluster message
// handlers so we can induce events coming in from a peer.
- clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
- anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
- expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
+ clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
+ anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
replay(clusterCommunicator);
@@ -237,15 +237,15 @@
assertEquals(VALUE1, ecMap.get(KEY1));
// Remote put
- ClusterMessage message
- = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
+ List<UpdateEntry<String, String>> message
+ = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
// Create a latch so we know when the put operation has finished
latch = new CountDownLatch(1);
ecMap.addListener(new TestListener(latch));
assertNull(ecMap.get(KEY2));
- updateHandler.handle(message);
+ updateHandler.accept(message);
assertTrue("External listener never got notified of internal event",
latch.await(100, TimeUnit.MILLISECONDS));
assertEquals(VALUE2, ecMap.get(KEY2));
@@ -255,14 +255,13 @@
assertNull(ecMap.get(KEY2));
// Remote remove
- ClusterMessage removeMessage
- = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
+ message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
// Create a latch so we know when the remove operation has finished
latch = new CountDownLatch(1);
ecMap.addListener(new TestListener(latch));
- updateHandler.handle(removeMessage);
+ updateHandler.accept(message);
assertTrue("External listener never got notified of internal event",
latch.await(100, TimeUnit.MILLISECONDS));
assertNull(ecMap.get(KEY1));
@@ -601,49 +600,35 @@
}
}
- private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
- PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
-
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(Lists.newArrayList(event)));
+ private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
+ return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
}
- private List<PutEntry<String, String>> generatePutMessage(
+ private List<UpdateEntry<String, String>> generatePutMessage(
String key1, String value1, String key2, String value2) {
- ArrayList<PutEntry<String, String>> list = new ArrayList<>();
+ List<UpdateEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
Timestamp timestamp2 = clockService.peek(2);
- PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1);
- PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2);
-
- list.add(pe1);
- list.add(pe2);
+ list.add(generatePutMessage(key1, value1, timestamp1));
+ list.add(generatePutMessage(key2, value2, timestamp2));
return list;
}
- private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
- RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
-
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(Lists.newArrayList(event)));
+ private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
+ return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
}
- private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
- ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
+ private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
+ List<UpdateEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
Timestamp timestamp2 = clockService.peek(2);
- RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
- RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
-
- list.add(re1);
- list.add(re2);
+ list.add(generateRemoveMessage(key1, timestamp1));
+ list.add(generateRemoveMessage(key2, timestamp2));
return list;
}
@@ -737,13 +722,6 @@
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
- if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
- updateHandler = subscriber;
- } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
- antiEntropyHandler = subscriber;
- } else {
- throw new RuntimeException("Unexpected message subject " + subject.toString());
- }
}
@Override
@@ -793,6 +771,13 @@
public <M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder, Consumer<M> handler,
Executor executor) {
+ if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
+ updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
+ } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
+ antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
+ } else {
+ throw new RuntimeException("Unexpected message subject " + subject.toString());
+ }
}
}