[ONOS-3591] Anti-Entropy speed up via push/pull interaction
Adds an UpdateRequest message. This contains a set of keys that a node
is missing updates for. The receiver will then send an UpdateEntry for
each missing key to the requester.
Change-Id: I2115f4a05833b51ae14d1191f09f083b5251f8ec
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 0012d68..8d41fd2 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -88,6 +88,8 @@
= new MessageSubject("ecm-" + MAP_NAME + "-update");
private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
+ private static final MessageSubject UPDATE_REQUEST_SUBJECT
+ = new MessageSubject("ecm-" + MAP_NAME + "-update-request");
private static final String KEY1 = "one";
private static final String KEY2 = "two";
@@ -98,6 +100,7 @@
new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
+ private Consumer<Collection<UpdateRequest<String>>> requestHandler;
private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
@Before
@@ -123,6 +126,9 @@
anyObject(Function.class),
anyObject(Executor.class));
expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
+ clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
+ anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
replay(clusterCommunicator);
@@ -627,6 +633,7 @@
@Test
public void testDestroy() throws Exception {
clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
+ clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
replay(clusterCommunicator);
@@ -774,6 +781,8 @@
Executor executor) {
if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
+ } else if (subject.equals(UPDATE_REQUEST_SUBJECT)) {
+ requestHandler = (Consumer<Collection<UpdateRequest<String>>>) handler;
} else {
throw new RuntimeException("Unexpected message subject " + subject.toString());
}