ONOS-2322: Support for periodic purging of ECMap tombstones
Change-Id: I6fe5475a472c383c4a51bd61446fba8f1dba1d37
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 3b4f36c..3d9dc4e 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -103,7 +103,7 @@
new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
- private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
+ private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
/*
* Serialization is a bit tricky here. We need to serialize in the tests
@@ -144,9 +144,15 @@
// delegate to our ClusterCommunicationService implementation. This
// allows us to get a reference to the map's internal cluster message
// handlers so we can induce events coming in from a peer.
- clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
+ clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
- expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
+ clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(Function.class),
+ anyObject(Function.class),
+ anyObject(Executor.class));
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
replay(clusterCommunicator);
@@ -798,8 +804,16 @@
Executor executor) {
if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
- } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
- antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
+ } else {
+ throw new RuntimeException("Unexpected message subject " + subject.toString());
+ }
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+ if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
+ antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
} else {
throw new RuntimeException("Unexpected message subject " + subject.toString());
}