Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index 7054bd3..f271de4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -65,6 +65,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import static java.util.Arrays.asList;
import static org.easymock.EasyMock.*;
@@ -181,8 +182,9 @@
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN, CID, annotations);
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
- .andReturn(true).anyTimes();
+ clusterCommunicator.<InternalDeviceEvent>broadcast(
+ anyObject(InternalDeviceEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
deviceStore.createOrUpdateDevice(PID, deviceId, description);
verify(clusterCommunicator);
@@ -296,36 +298,43 @@
}
private void assertInternalDeviceEvent(NodeId sender,
- DeviceId deviceId,
- ProviderId providerId,
- DeviceDescription expectedDesc,
- Capture<ClusterMessage> actualMsg) {
- assertTrue(actualMsg.hasCaptured());
- assertEquals(sender, actualMsg.getValue().sender());
+ DeviceId deviceId,
+ ProviderId providerId,
+ DeviceDescription expectedDesc,
+ Capture<InternalDeviceEvent> actualEvent,
+ Capture<MessageSubject> actualSubject,
+ Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
+ assertTrue(actualEvent.hasCaptured());
+ assertTrue(actualSubject.hasCaptured());
+ assertTrue(actualEncoder.hasCaptured());
+
assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
- actualMsg.getValue().subject());
- InternalDeviceEvent addEvent
- = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
- assertEquals(deviceId, addEvent.deviceId());
- assertEquals(providerId, addEvent.providerId());
- assertDeviceDescriptionEquals(expectedDesc, addEvent.deviceDescription().value());
+ actualSubject.getValue());
+ assertEquals(deviceId, actualEvent.getValue().deviceId());
+ assertEquals(providerId, actualEvent.getValue().providerId());
+ assertDeviceDescriptionEquals(expectedDesc, actualEvent.getValue().deviceDescription().value());
}
private void assertInternalDeviceEvent(NodeId sender,
- DeviceId deviceId,
- ProviderId providerId,
- DeviceDescription expectedDesc,
- List<SparseAnnotations> expectedAnnotations,
- Capture<ClusterMessage> actualMsg) {
- assertTrue(actualMsg.hasCaptured());
- assertEquals(sender, actualMsg.getValue().sender());
+ DeviceId deviceId,
+ ProviderId providerId,
+ DeviceDescription expectedDesc,
+ List<SparseAnnotations> expectedAnnotations,
+ Capture<InternalDeviceEvent> actualEvent,
+ Capture<MessageSubject> actualSubject,
+ Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
+ assertTrue(actualEvent.hasCaptured());
+ assertTrue(actualSubject.hasCaptured());
+ assertTrue(actualEncoder.hasCaptured());
+
assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
- actualMsg.getValue().subject());
- InternalDeviceEvent addEvent
- = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
- assertEquals(deviceId, addEvent.deviceId());
- assertEquals(providerId, addEvent.providerId());
- assertDeviceDescriptionEquals(expectedDesc, expectedAnnotations, addEvent.deviceDescription().value());
+ actualSubject.getValue());
+ assertEquals(deviceId, actualEvent.getValue().deviceId());
+ assertEquals(providerId, actualEvent.getValue().providerId());
+ assertDeviceDescriptionEquals(
+ expectedDesc,
+ expectedAnnotations,
+ actualEvent.getValue().deviceDescription().value());
}
@Test
@@ -333,26 +342,28 @@
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, CID);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
verify(clusterCommunicator);
- assertInternalDeviceEvent(NID1, DID1, PID, description, bcast);
+ assertInternalDeviceEvent(NID1, DID1, PID, description, message, subject, encoder);
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN, CID);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
verify(clusterCommunicator);
- assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+ assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
reset(clusterCommunicator);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
@@ -366,7 +377,11 @@
HW, SW1, SN, CID, A2);
Capture<ClusterMessage> bcast = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
+
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
@@ -374,13 +389,13 @@
assertAnnotationsEquals(event.subject().annotations(), A2);
assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
- assertInternalDeviceEvent(NID1, DID1, PIDA, description, bcast);
+ assertInternalDeviceEvent(NID1, DID1, PIDA, description, message, subject, encoder);
// update from primary
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN, CID, A1);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
@@ -389,17 +404,17 @@
assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
assertTrue(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
- assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+ assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
// no-op update from primary
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
verify(clusterCommunicator);
assertFalse("no broadcast expected", bcast.hasCaptured());
// For now, Ancillary is ignored once primary appears
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
@@ -410,7 +425,7 @@
DeviceDescription description3 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, CID, A2_2);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
assertEquals(DEVICE_UPDATED, event3.type());
@@ -423,7 +438,7 @@
verify(clusterCommunicator);
// note: only annotation from PIDA is sent over the wire
assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
- asList(union(A2, A2_2)), bcast);
+ asList(union(A2, A2_2)), message, subject, encoder);
}
@@ -434,23 +449,25 @@
putDevice(DID1, SW1);
assertTrue(deviceStore.isAvailable(DID1));
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event = deviceStore.markOffline(DID1);
assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
assertDevice(DID1, SW1, event.subject());
assertFalse(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
DeviceEvent event2 = deviceStore.markOffline(DID1);
assertNull("No change, no event", event2);
verify(clusterCommunicator);
- assertFalse(bcast.hasCaptured());
+ assertFalse(message.hasCaptured());
}
@Test
@@ -460,13 +477,15 @@
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, true)
);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
for (DeviceEvent event : events) {
@@ -485,11 +504,11 @@
new DefaultPortDescription(P3, true)
);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
events = deviceStore.updatePorts(PID, DID1, pds2);
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
@@ -513,11 +532,11 @@
new DefaultPortDescription(P1, false),
new DefaultPortDescription(P2, true)
);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
events = deviceStore.updatePorts(PID, DID1, pds3);
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
@@ -544,9 +563,11 @@
);
deviceStore.updatePorts(PID, DID1, pds);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalPortStatusEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
assertEquals(PORT_UPDATED, event.type());
@@ -554,8 +575,8 @@
assertEquals(P1, event.port().number());
assertFalse("Port is disabled", event.port().isEnabled());
verify(clusterCommunicator);
- assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, bcast);
- assertTrue(bcast.hasCaptured());
+ assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, message, subject, encoder);
+ assertTrue(message.hasCaptured());
}
@Test
@@ -567,11 +588,13 @@
);
deviceStore.updatePorts(PID, DID1, pds);
- Capture<ClusterMessage> bcast = new Capture<>();
-
+ Capture<InternalPortStatusEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
// update port from primary
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
+
final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
assertEquals(PORT_UPDATED, event.type());
@@ -580,19 +603,19 @@
assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
assertFalse("Port is disabled", event.port().isEnabled());
verify(clusterCommunicator);
- assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), bcast);
- assertTrue(bcast.hasCaptured());
+ assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), message, subject, encoder);
+ assertTrue(message.hasCaptured());
// update port from ancillary with no attributes
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
assertNull("Ancillary is ignored if primary exists", event2);
verify(clusterCommunicator);
- assertFalse(bcast.hasCaptured());
+ assertFalse(message.hasCaptured());
// but, Ancillary annotation update will be notified
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
assertEquals(PORT_UPDATED, event3.type());
@@ -601,11 +624,11 @@
assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
assertFalse("Port is disabled", event3.port().isEnabled());
verify(clusterCommunicator);
- assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), bcast);
- assertTrue(bcast.hasCaptured());
+ assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), message, subject, encoder);
+ assertTrue(message.hasCaptured());
// port only reported from Ancillary will be notified as down
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
assertEquals(PORT_ADDED, event4.type());
@@ -616,25 +639,29 @@
event4.port().isEnabled());
verify(clusterCommunicator);
// TODO: verify broadcast message content
- assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, bcast);
- assertTrue(bcast.hasCaptured());
+ assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, message, subject, encoder);
+ assertTrue(message.hasCaptured());
}
- private void assertInternalPortStatusEvent(NodeId sender, DeviceId did,
- ProviderId pid, DefaultPortDescription expectedDesc,
- List<SparseAnnotations> expectedAnnotations, Capture<ClusterMessage> actualMsg) {
+ private void assertInternalPortStatusEvent(NodeId sender,
+ DeviceId did,
+ ProviderId pid,
+ DefaultPortDescription expectedDesc,
+ List<SparseAnnotations> expectedAnnotations,
+ Capture<InternalPortStatusEvent> actualEvent,
+ Capture<MessageSubject> actualSubject,
+ Capture<Function<InternalPortStatusEvent, byte[]>> actualEncoder) {
- assertTrue(actualMsg.hasCaptured());
- assertEquals(sender, actualMsg.getValue().sender());
+ assertTrue(actualEvent.hasCaptured());
+ assertTrue(actualSubject.hasCaptured());
+ assertTrue(actualEncoder.hasCaptured());
+
assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
- actualMsg.getValue().subject());
- InternalPortStatusEvent addEvent
- = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
- assertEquals(did, addEvent.deviceId());
- assertEquals(pid, addEvent.providerId());
+ actualSubject.getValue());
+ assertEquals(did, actualEvent.getValue().deviceId());
+ assertEquals(pid, actualEvent.getValue().providerId());
assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
- addEvent.portDescription().value());
-
+ actualEvent.getValue().portDescription().value());
}
private void assertPortDescriptionEquals(
@@ -649,19 +676,31 @@
expectedAnnotations.toArray(new SparseAnnotations[0]));
}
- private void resetCommunicatorExpectingNoBroadcast(
- Capture<ClusterMessage> bcast) {
- bcast.reset();
+ private <T> void resetCommunicatorExpectingNoBroadcast(
+ Capture<T> message,
+ Capture<MessageSubject> subject,
+ Capture<Function<T, byte[]>> encoder) {
+ message.reset();
+ subject.reset();
+ encoder.reset();
reset(clusterCommunicator);
replay(clusterCommunicator);
}
- private void resetCommunicatorExpectingSingleBroadcast(
- Capture<ClusterMessage> bcast) {
+ private <T> void resetCommunicatorExpectingSingleBroadcast(
+ Capture<T> message,
+ Capture<MessageSubject> subject,
+ Capture<Function<T, byte[]>> encoder) {
- bcast.reset();
+ message.reset();
+ subject.reset();
+ encoder.reset();
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+ clusterCommunicator.broadcast(
+ capture(message),
+ capture(subject),
+ capture(encoder));
+ expectLastCall().once();
replay(clusterCommunicator);
}
@@ -724,9 +763,11 @@
assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event = deviceStore.removeDevice(DID1);
assertEquals(DEVICE_REMOVED, event.type());
@@ -736,7 +777,7 @@
assertEquals(0, deviceStore.getPorts(DID1).size());
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
// putBack Device, Port w/o annotation
putDevice(DID1, SW1);
@@ -825,10 +866,6 @@
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
}
-
- public <T> T deserialize(byte[] bytes) {
- return SERIALIZER.decode(bytes);
- }
}
private static final class TestClusterService extends StaticClusterService {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 0a3d5e6..8d495eb 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -30,6 +30,7 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractEvent;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -44,17 +45,20 @@
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
import static com.google.common.base.Preconditions.checkArgument;
import static junit.framework.TestCase.assertFalse;
@@ -281,7 +285,7 @@
// Set up expected internal message to be broadcast to peers on first put
expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
- .peekAtNextTimestamp()), clusterCommunicator);
+ .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
// Put first value
assertNull(ecMap.get(KEY1));
@@ -292,7 +296,7 @@
// Set up expected internal message to be broadcast to peers on second put
expectSpecificMulticastMessage(generatePutMessage(
- KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
+ KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
// Update same key to a new value
ecMap.put(KEY1, VALUE2);
@@ -341,7 +345,7 @@
// Remove the value and check the correct internal cluster messages
// are sent
expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- clusterCommunicator);
+ UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
ecMap.remove(KEY1);
assertNull(ecMap.get(KEY1));
@@ -352,7 +356,7 @@
// the map, we expect that the tombstone is updated and another remove
// event is sent to the cluster and external listeners.
expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- clusterCommunicator);
+ UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
ecMap.remove(KEY1);
assertNull(ecMap.get(KEY1));
@@ -402,7 +406,7 @@
ecMap.addListener(listener);
// Expect a multi-update inter-instance message
- expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+ expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
clusterCommunicator);
Map<String, String> putAllValues = new HashMap<>();
@@ -441,7 +445,7 @@
ecMap.put(KEY2, VALUE2);
ecMap.addListener(listener);
- expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+ expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
ecMap.clear();
@@ -605,7 +609,8 @@
SERIALIZER.encode(Lists.newArrayList(event)));
}
- private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
+ private List<PutEntry<String, String>> generatePutMessage(
+ String key1, String value1, String key2, String value2) {
ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
@@ -617,10 +622,7 @@
list.add(pe1);
list.add(pe2);
-
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(list));
+ return list;
}
private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
@@ -631,7 +633,7 @@
SERIALIZER.encode(Lists.newArrayList(event)));
}
- private ClusterMessage generateRemoveMessage(String key1, String key2) {
+ private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
@@ -643,9 +645,7 @@
list.add(re1);
list.add(re2);
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(list));
+ return list;
}
/**
@@ -656,13 +656,13 @@
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
//FIXME rename
- private static void expectSpecificBroadcastMessage(ClusterMessage m,
- ClusterCommunicationService clusterCommunicator) {
+ private static <T> void expectSpecificBroadcastMessage(
+ T message,
+ MessageSubject subject,
+ ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
-// expect(clusterCommunicator.broadcast(m)).andReturn(true);
- expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
- .andReturn(true)
- .anyTimes();
+ clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
}
@@ -670,17 +670,16 @@
* Sets up a mock ClusterCommunicationService to expect a specific cluster
* message to be multicast to the cluster.
*
- * @param m message we expect to be sent
+ * @param message message we expect to be sent
+ * @param subject subject we expect to be sent to
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
//FIXME rename
- private static void expectSpecificMulticastMessage(ClusterMessage m,
+ private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
-// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
- expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
- .andReturn(true)
- .anyTimes();
+ clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
}
@@ -693,12 +692,15 @@
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
//FIXME rename
- private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
+ private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
// anyObject(Iterable.class)))
- expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
- anyObject(NodeId.class)))
+ expect(clusterCommunicator.<T>unicast(
+ anyObject(),
+ anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(NodeId.class)))
.andReturn(true)
.anyTimes();
replay(clusterCommunicator);
@@ -711,15 +713,14 @@
*
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
- //FIXME rename
private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
-// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-// .andReturn(true)
-// .anyTimes();
- expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
- .andReturn(true)
- .anyTimes();
+ clusterCommunicator.<AbstractEvent>multicast(
+ anyObject(AbstractEvent.class),
+ anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(Set.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
}
@@ -733,45 +734,6 @@
implements ClusterCommunicationService {
@Override
- public boolean broadcast(ClusterMessage message) {
- return false;
- }
-
- @Override
- public boolean broadcastIncludeSelf(ClusterMessage message) {
- return false;
- }
-
- @Override
- public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- return false;
- }
-
- @Override
- public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
- return false;
- }
-
- @Override
- public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
- NodeId toNodeId)
- throws IOException {
- return null;
- }
-
- @Override
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber) {
- if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
- updateHandler = subscriber;
- } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
- antiEntropyHandler = subscriber;
- } else {
- throw new RuntimeException("Unexpected message subject " + subject.toString());
- }
- }
-
- @Override
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
@@ -786,6 +748,73 @@
@Override
public void removeSubscriber(MessageSubject subject) {}
+
+ @Override
+ public <M> void broadcast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject, Function<M, byte[]> encoder) {
+ }
+
+ @Override
+ public <M> boolean unicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, NodeId toNodeId) {
+ return false;
+ }
+
+ @Override
+ public <M> void multicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Set<NodeId> nodes) {
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject, Function<M, byte[]> encoder,
+ Function<byte[], R> decoder, NodeId toNodeId) {
+ return null;
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder, Function<M, R> handler,
+ Function<R, byte[]> encoder, ExecutorService executor) {
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder, Consumer<M> handler,
+ ExecutorService executor) {
+ }
+
+ @Override
+ public boolean broadcast(ClusterMessage message) {
+ return false;
+ }
+
+ @Override
+ public boolean broadcastIncludeSelf(ClusterMessage message) {
+ return false;
+ }
+
+ @Override
+ public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+ return false;
+ }
+
+ @Override
+ public boolean multicast(ClusterMessage message,
+ Iterable<NodeId> nodeIds) {
+ return false;
+ }
+
+ @Override
+ public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
+ NodeId toNodeId) {
+ return null;
+ }
}
/**
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
index 03e82bc..b0da337 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
@@ -28,7 +28,6 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.ConnectPoint;
@@ -48,7 +47,6 @@
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.cluster.StaticClusterService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.device.impl.DeviceClockManager;
@@ -59,6 +57,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
@@ -119,7 +118,6 @@
private DeviceClockManager deviceClockManager;
private DeviceClockService deviceClockService;
private ClusterCommunicationService clusterCommunicator;
- private MastershipService mastershipService;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -171,26 +169,24 @@
ConnectPoint src = new ConnectPoint(srcId, srcNum);
ConnectPoint dst = new ConnectPoint(dstId, dstNum);
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
- .andReturn(true).anyTimes();
+ clusterCommunicator.<InternalLinkEvent>broadcast(
+ anyObject(InternalLinkEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
verify(clusterCommunicator);
}
- private void resetCommunicatorExpectingNoBroadcast(
- Capture<ClusterMessage> bcast) {
- bcast.reset();
+ private <T> void resetCommunicatorExpectingSingleBroadcast(
+ Capture<T> message,
+ Capture<MessageSubject> subject,
+ Capture<Function<T, byte[]>> encoder) {
+ message.reset();
+ subject.reset();
+ encoder.reset();
reset(clusterCommunicator);
- replay(clusterCommunicator);
- }
-
- private void resetCommunicatorExpectingSingleBroadcast(
- Capture<ClusterMessage> bcast) {
-
- bcast.reset();
- reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+ clusterCommunicator.broadcast(capture(message), capture(subject), capture(encoder));
+ expectLastCall().once();
replay(clusterCommunicator);
}
@@ -367,56 +363,55 @@
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalLinkEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
// add link
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultLinkDescription linkDescription = new DefaultLinkDescription(src, dst, INDIRECT);
LinkEvent event = linkStore.createOrUpdateLink(PID,
linkDescription);
- verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject());
assertEquals(LINK_ADDED, event.type());
// update link type
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event2 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject());
assertEquals(LINK_UPDATED, event2.type());
// no change
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event3 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
- verifyNoBroadcastMessage(bcast);
+ verifyNoBroadcastMessage(message);
assertNull("No change event expected", event3);
}
- private void verifyNoBroadcastMessage(Capture<ClusterMessage> bcast) {
- assertFalse("No broadcast expected", bcast.hasCaptured());
+ private <T> void verifyNoBroadcastMessage(Capture<T> message) {
+ assertFalse("No broadcast expected", message.hasCaptured());
}
private void verifyLinkBroadcastMessage(ProviderId providerId,
- NodeId sender,
- ConnectPoint src,
- ConnectPoint dst,
- Type type,
- Capture<ClusterMessage> actualMsg) {
+ NodeId sender,
+ ConnectPoint src,
+ ConnectPoint dst,
+ Type type,
+ Capture<InternalLinkEvent> actualLinkEvent,
+ Capture<MessageSubject> actualSubject,
+ Capture<Function<InternalLinkEvent, byte[]>> actualEncoder) {
verify(clusterCommunicator);
- assertTrue(actualMsg.hasCaptured());
- assertEquals(sender, actualMsg.getValue().sender());
- assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE,
- actualMsg.getValue().subject());
- InternalLinkEvent linkEvent
- = GossipLinkStore.SERIALIZER.decode(actualMsg.getValue().payload());
- assertEquals(providerId, linkEvent.providerId());
- assertLinkDescriptionEquals(src, dst, type, linkEvent.linkDescription().value());
-
+ assertTrue(actualLinkEvent.hasCaptured());
+ assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE, actualSubject.getValue());
+ assertEquals(providerId, actualLinkEvent.getValue().providerId());
+ assertLinkDescriptionEquals(src, dst, type, actualLinkEvent.getValue().linkDescription().value());
}
private static void assertLinkDescriptionEquals(ConnectPoint src,
@@ -434,31 +429,33 @@
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalLinkEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
// add Ancillary link
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, INDIRECT, A1));
- verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, bcast);
+ verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, message, subject, encoder);
assertNotNull("Ancillary only link is ignored", event);
// add Primary link
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event2 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, INDIRECT, A2));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
assertEquals(LINK_UPDATED, event2.type());
// update link type
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event3 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
@@ -466,38 +463,38 @@
// no change
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event4 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
- verifyNoBroadcastMessage(bcast);
+ verifyNoBroadcastMessage(message);
assertNull("No change event expected", event4);
// update link annotation (Primary)
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event5 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2_2));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
assertEquals(LINK_UPDATED, event5.type());
// update link annotation (Ancillary)
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, DIRECT, A1_2));
- verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, bcast);
+ verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
assertEquals(LINK_UPDATED, event6.type());
// update link type (Ancillary) : ignored
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, EDGE));
- verifyNoBroadcastMessage(bcast);
+ verifyNoBroadcastMessage(message);
assertNull("Ancillary change other than annotation is ignored", event7);
}