Adding some tests for GossipDeviceStore + bugfix
Change-Id: Ic0d55fa499b1d66131f059b4a47cd105c55a6e63
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 7b00833..33517c7 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -62,6 +62,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
index f7fd7bc..03c293a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
@@ -58,7 +58,7 @@
*
* @param newDesc new DeviceDescription
*/
- public synchronized void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
+ public void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
Timestamped<DeviceDescription> oldOne = deviceDesc;
Timestamped<DeviceDescription> newOne = newDesc;
if (oldOne != null) {
@@ -76,7 +76,7 @@
*
* @param newDesc new PortDescription
*/
- public synchronized void putPortDesc(Timestamped<PortDescription> newDesc) {
+ public void putPortDesc(Timestamped<PortDescription> newDesc) {
Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
Timestamped<PortDescription> newOne = newDesc;
if (oldOne != null) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 6ab529a..12ecf74 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -1,5 +1,6 @@
package org.onlab.onos.store.device.impl;
+import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
@@ -118,7 +119,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
@@ -206,14 +207,19 @@
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) {
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+ final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
- DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+ final DeviceEvent event;
+ final Timestamped<DeviceDescription> mergedDesc;
+ synchronized (getDeviceDescriptions(deviceId)) {
+ event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+ mergedDesc = getDeviceDescriptions(deviceId).get(providerId).getDeviceDesc();
+ }
if (event != null) {
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
- notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+ notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a device update topology event for providerId: "
+ providerId + " and deviceId: " + deviceId, e);
@@ -317,8 +323,8 @@
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
- Timestamp timestamp = clockService.getTimestamp(deviceId);
- DeviceEvent event = markOfflineInternal(deviceId, timestamp);
+ final Timestamp timestamp = clockService.getTimestamp(deviceId);
+ final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
if (event != null) {
log.info("Notifying peers of a device offline topology event for deviceId: {}",
deviceId);
@@ -390,17 +396,33 @@
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
DeviceId deviceId,
List<PortDescription> portDescriptions) {
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- Timestamped<List<PortDescription>> timestampedPortDescriptions =
- new Timestamped<>(portDescriptions, newTimestamp);
+ final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
+ final Timestamped<List<PortDescription>> timestampedInput
+ = new Timestamped<>(portDescriptions, newTimestamp);
+ final List<DeviceEvent> events;
+ final Timestamped<List<PortDescription>> merged;
+
+ synchronized (getDeviceDescriptions(deviceId)) {
+ events = updatePortsInternal(providerId, deviceId, timestampedInput);
+ final DeviceDescriptions descs = getDeviceDescriptions(deviceId).get(providerId);
+ List<PortDescription> mergedList =
+ FluentIterable.from(portDescriptions)
+ .transform(new Function<PortDescription, PortDescription>() {
+ @Override
+ public PortDescription apply(PortDescription input) {
+ // lookup merged port description
+ return descs.getPortDesc(input.portNumber()).value();
+ }
+ }).toList();
+ merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
+ }
if (!events.isEmpty()) {
log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
- notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+ notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
} catch (IOException e) {
log.error("Failed to notify peers of a port update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
@@ -527,16 +549,25 @@
}
@Override
- public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
- PortDescription portDescription) {
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
- DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+ public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
+ DeviceId deviceId,
+ PortDescription portDescription) {
+
+ final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+ final Timestamped<PortDescription> deltaDesc
+ = new Timestamped<>(portDescription, newTimestamp);
+ final DeviceEvent event;
+ final Timestamped<PortDescription> mergedDesc;
+ synchronized (getDeviceDescriptions(deviceId)) {
+ event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+ mergedDesc = getDeviceDescriptions(deviceId).get(providerId)
+ .getPortDesc(portDescription.portNumber());
+ }
if (event != null) {
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
- notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+ notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a port status update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
@@ -684,7 +715,7 @@
* @return Device instance
*/
private Device composeDevice(DeviceId deviceId,
- ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+ Map<ProviderId, DeviceDescriptions> providerDescs) {
checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
index fa42a6b..3c83169 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -1,12 +1,16 @@
package org.onlab.onos.store.device.impl;
+import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import static org.onlab.onos.net.Device.Type.SWITCH;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
-
+import static org.onlab.onos.cluster.ControllerNode.State.*;
+import static org.onlab.onos.net.DefaultAnnotations.union;
+import static java.util.Arrays.asList;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -14,6 +18,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -90,14 +95,25 @@
.set("B4", "b4")
.build();
- private static final NodeId MYSELF = new NodeId("myself");
+ // local node
+ private static final NodeId NID1 = new NodeId("local");
+ private static final ControllerNode ONOS1 =
+ new DefaultControllerNode(NID1, IpPrefix.valueOf("127.0.0.1"));
+ // remote node
+ private static final NodeId NID2 = new NodeId("remote");
+ private static final ControllerNode ONOS2 =
+ new DefaultControllerNode(NID2, IpPrefix.valueOf("127.0.0.2"));
+ private static final List<SparseAnnotations> NO_ANNOTATION = Collections.<SparseAnnotations>emptyList();
+
+
+ private TestGossipDeviceStore testGossipDeviceStore;
private GossipDeviceStore gossipDeviceStore;
private DeviceStore deviceStore;
private DeviceClockManager deviceClockManager;
private ClockService clockService;
-
+ private ClusterCommunicationService clusterCommunicator;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
@@ -113,15 +129,22 @@
deviceClockManager.activate();
clockService = deviceClockManager;
- deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
- deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
+ deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(NID1, 1));
+ deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(NID1, 2));
- ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
+ clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
+ clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
+ anyObject(ClusterMessageHandler.class));
+ expectLastCall().anyTimes();
+ replay(clusterCommunicator);
ClusterService clusterService = new TestClusterService();
- gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
+ testGossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
+ gossipDeviceStore = testGossipDeviceStore;
gossipDeviceStore.activate();
deviceStore = gossipDeviceStore;
+ verify(clusterCommunicator);
+ reset(clusterCommunicator);
}
@After
@@ -135,7 +158,16 @@
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN, annotations);
+ reset(clusterCommunicator);
+ try {
+ expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
+ .andReturn(true).anyTimes();
+ } catch (IOException e) {
+ fail("Should never reach here");
+ }
+ replay(clusterCommunicator);
deviceStore.createOrUpdateDevice(PID, deviceId, description);
+ verify(clusterCommunicator);
}
private void putDeviceAncillary(DeviceId deviceId, String swVersion,
@@ -163,9 +195,9 @@
* @param annotations
*/
private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
- DefaultAnnotations expected = DefaultAnnotations.builder().build();
+ SparseAnnotations expected = DefaultAnnotations.builder().build();
for (SparseAnnotations a : annotations) {
- expected = DefaultAnnotations.merge(expected, a);
+ expected = DefaultAnnotations.union(expected, a);
}
assertEquals(expected.keys(), actual.keys());
for (String key : expected.keys()) {
@@ -173,6 +205,36 @@
}
}
+ private static void assertDeviceDescriptionEquals(DeviceDescription expected,
+ DeviceDescription actual) {
+ if (expected == actual) {
+ return;
+ }
+ assertEquals(expected.deviceURI(), actual.deviceURI());
+ assertEquals(expected.hwVersion(), actual.hwVersion());
+ assertEquals(expected.manufacturer(), actual.manufacturer());
+ assertEquals(expected.serialNumber(), actual.serialNumber());
+ assertEquals(expected.swVersion(), actual.swVersion());
+
+ assertAnnotationsEquals(actual.annotations(), expected.annotations());
+ }
+
+ private static void assertDeviceDescriptionEquals(DeviceDescription expected,
+ List<SparseAnnotations> expectedAnnotations,
+ DeviceDescription actual) {
+ if (expected == actual) {
+ return;
+ }
+ assertEquals(expected.deviceURI(), actual.deviceURI());
+ assertEquals(expected.hwVersion(), actual.hwVersion());
+ assertEquals(expected.manufacturer(), actual.manufacturer());
+ assertEquals(expected.serialNumber(), actual.serialNumber());
+ assertEquals(expected.swVersion(), actual.swVersion());
+
+ assertAnnotationsEquals(actual.annotations(),
+ expectedAnnotations.toArray(new SparseAnnotations[0]));
+ }
+
@Test
public final void testGetDeviceCount() {
assertEquals("initialy empty", 0, deviceStore.getDeviceCount());
@@ -215,56 +277,123 @@
assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2));
}
+ private void assertInternalDeviceEvent(NodeId sender,
+ DeviceId deviceId,
+ ProviderId providerId,
+ DeviceDescription expectedDesc,
+ Capture<ClusterMessage> actualMsg) {
+ assertTrue(actualMsg.hasCaptured());
+ assertEquals(sender, actualMsg.getValue().sender());
+ assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+ actualMsg.getValue().subject());
+ InternalDeviceEvent addEvent
+ = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
+ assertEquals(deviceId, addEvent.deviceId());
+ assertEquals(providerId, addEvent.providerId());
+ assertDeviceDescriptionEquals(expectedDesc, addEvent.deviceDescription().value());
+ }
+
+ private void assertInternalDeviceEvent(NodeId sender,
+ DeviceId deviceId,
+ ProviderId providerId,
+ DeviceDescription expectedDesc,
+ List<SparseAnnotations> expectedAnnotations,
+ Capture<ClusterMessage> actualMsg) {
+ assertTrue(actualMsg.hasCaptured());
+ assertEquals(sender, actualMsg.getValue().sender());
+ assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+ actualMsg.getValue().subject());
+ InternalDeviceEvent addEvent
+ = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
+ assertEquals(deviceId, addEvent.deviceId());
+ assertEquals(providerId, addEvent.providerId());
+ assertDeviceDescriptionEquals(expectedDesc, expectedAnnotations, addEvent.deviceDescription().value());
+ }
+
@Test
- public final void testCreateOrUpdateDevice() {
+ public final void testCreateOrUpdateDevice() throws IOException {
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN);
+ Capture<ClusterMessage> bcast = new Capture<>();
+
+ resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
+ verify(clusterCommunicator);
+ assertInternalDeviceEvent(NID1, DID1, PID, description, bcast);
+
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN);
+ resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
+ verify(clusterCommunicator);
+ assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+ reset(clusterCommunicator);
+
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
}
@Test
- public final void testCreateOrUpdateDeviceAncillary() {
+ public final void testCreateOrUpdateDeviceAncillary() throws IOException {
+ // add
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, A2);
+ Capture<ClusterMessage> bcast = new Capture<>();
+
+ resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(PIDA, event.subject().providerId());
assertAnnotationsEquals(event.subject().annotations(), A2);
assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
+ verify(clusterCommunicator);
+ assertInternalDeviceEvent(NID1, DID1, PIDA, description, bcast);
+ // update from primary
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN, A1);
+ resetCommunicatorExpectingSingleBroadcast(bcast);
+
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
assertEquals(PID, event2.subject().providerId());
assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
assertTrue(deviceStore.isAvailable(DID1));
+ verify(clusterCommunicator);
+ assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+ // no-op update from primary
+ resetCommunicatorExpectingNoBroadcast(bcast);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
+ verify(clusterCommunicator);
+ assertFalse("no broadcast expected", bcast.hasCaptured());
+
// For now, Ancillary is ignored once primary appears
+ resetCommunicatorExpectingNoBroadcast(bcast);
+
assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
+ verify(clusterCommunicator);
+ assertFalse("no broadcast expected", bcast.hasCaptured());
+
// But, Ancillary annotations will be in effect
DeviceDescription description3 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, A2_2);
+ resetCommunicatorExpectingSingleBroadcast(bcast);
+
DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
assertEquals(DEVICE_UPDATED, event3.type());
// basic information will be the one from Primary
@@ -273,6 +402,11 @@
// but annotation from Ancillary will be merged
assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2);
assertTrue(deviceStore.isAvailable(DID1));
+ verify(clusterCommunicator);
+ // note: only annotation from PIDA is sent over the wire
+ assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
+ asList(union(A2, A2_2)), bcast);
+
}
@@ -282,14 +416,24 @@
putDevice(DID1, SW1);
assertTrue(deviceStore.isAvailable(DID1));
+ Capture<ClusterMessage> bcast = new Capture<>();
+
+ resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.markOffline(DID1);
assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
assertDevice(DID1, SW1, event.subject());
assertFalse(deviceStore.isAvailable(DID1));
+ verify(clusterCommunicator);
+ // TODO: verify broadcast message
+ assertTrue(bcast.hasCaptured());
+
+ resetCommunicatorExpectingNoBroadcast(bcast);
DeviceEvent event2 = deviceStore.markOffline(DID1);
assertNull("No change, no event", event2);
-}
+ verify(clusterCommunicator);
+ assertFalse(bcast.hasCaptured());
+ }
@Test
public final void testUpdatePorts() {
@@ -298,8 +442,13 @@
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, true)
);
+ Capture<ClusterMessage> bcast = new Capture<>();
+ resetCommunicatorExpectingSingleBroadcast(bcast);
List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
+ verify(clusterCommunicator);
+ // TODO: verify broadcast message
+ assertTrue(bcast.hasCaptured());
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
for (DeviceEvent event : events) {
@@ -318,7 +467,12 @@
new DefaultPortDescription(P3, true)
);
+ resetCommunicatorExpectingSingleBroadcast(bcast);
events = deviceStore.updatePorts(PID, DID1, pds2);
+ verify(clusterCommunicator);
+ // TODO: verify broadcast message
+ assertTrue(bcast.hasCaptured());
+
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
@@ -341,7 +495,12 @@
new DefaultPortDescription(P1, false),
new DefaultPortDescription(P2, true)
);
+ resetCommunicatorExpectingSingleBroadcast(bcast);
events = deviceStore.updatePorts(PID, DID1, pds3);
+ verify(clusterCommunicator);
+ // TODO: verify broadcast message
+ assertTrue(bcast.hasCaptured());
+
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
@@ -357,7 +516,6 @@
fail("Unknown port number encountered: " + num);
}
}
-
}
@Test
@@ -368,16 +526,22 @@
);
deviceStore.updatePorts(PID, DID1, pds);
- DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
- new DefaultPortDescription(P1, false));
+ Capture<ClusterMessage> bcast = new Capture<>();
+
+ resetCommunicatorExpectingSingleBroadcast(bcast);
+ final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
+ DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertFalse("Port is disabled", event.port().isEnabled());
-
+ verify(clusterCommunicator);
+ assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, bcast);
+ assertTrue(bcast.hasCaptured());
}
+
@Test
- public final void testUpdatePortStatusAncillary() {
+ public final void testUpdatePortStatusAncillary() throws IOException {
putDeviceAncillary(DID1, SW1);
putDevice(DID1, SW1);
List<PortDescription> pds = Arrays.<PortDescription>asList(
@@ -385,36 +549,106 @@
);
deviceStore.updatePorts(PID, DID1, pds);
- DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
- new DefaultPortDescription(P1, false, A1_2));
+ Capture<ClusterMessage> bcast = new Capture<>();
+
+
+ // update port from primary
+ resetCommunicatorExpectingSingleBroadcast(bcast);
+ final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
+ DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
assertFalse("Port is disabled", event.port().isEnabled());
+ verify(clusterCommunicator);
+ assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), bcast);
+ assertTrue(bcast.hasCaptured());
- DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
- new DefaultPortDescription(P1, true));
+ // update port from ancillary with no attributes
+ resetCommunicatorExpectingNoBroadcast(bcast);
+ final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
+ DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
assertNull("Ancillary is ignored if primary exists", event2);
+ verify(clusterCommunicator);
+ assertFalse(bcast.hasCaptured());
// but, Ancillary annotation update will be notified
- DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
- new DefaultPortDescription(P1, true, A2));
+ resetCommunicatorExpectingSingleBroadcast(bcast);
+ final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
+ DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
assertEquals(PORT_UPDATED, event3.type());
assertDevice(DID1, SW1, event3.subject());
assertEquals(P1, event3.port().number());
assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
assertFalse("Port is disabled", event3.port().isEnabled());
+ verify(clusterCommunicator);
+ assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), bcast);
+ assertTrue(bcast.hasCaptured());
// port only reported from Ancillary will be notified as down
- DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1,
- new DefaultPortDescription(P2, true));
+ resetCommunicatorExpectingSingleBroadcast(bcast);
+ final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
+ DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
assertEquals(PORT_ADDED, event4.type());
assertDevice(DID1, SW1, event4.subject());
assertEquals(P2, event4.port().number());
assertAnnotationsEquals(event4.port().annotations());
assertFalse("Port is disabled if not given from primary provider",
event4.port().isEnabled());
+ verify(clusterCommunicator);
+ // TODO: verify broadcast message content
+ assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, bcast);
+ assertTrue(bcast.hasCaptured());
+ }
+
+ private void assertInternalPortStatusEvent(NodeId sender, DeviceId did,
+ ProviderId pid, DefaultPortDescription expectedDesc,
+ List<SparseAnnotations> expectedAnnotations, Capture<ClusterMessage> actualMsg) {
+
+ assertTrue(actualMsg.hasCaptured());
+ assertEquals(sender, actualMsg.getValue().sender());
+ assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+ actualMsg.getValue().subject());
+ InternalPortStatusEvent addEvent
+ = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
+ assertEquals(did, addEvent.deviceId());
+ assertEquals(pid, addEvent.providerId());
+ assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
+ addEvent.portDescription().value());
+
+ }
+
+ private void assertPortDescriptionEquals(
+ PortDescription expectedDesc,
+ List<SparseAnnotations> expectedAnnotations,
+ PortDescription actual) {
+
+ assertEquals(expectedDesc.portNumber(), actual.portNumber());
+ assertEquals(expectedDesc.isEnabled(), actual.isEnabled());
+
+ assertAnnotationsEquals(actual.annotations(),
+ expectedAnnotations.toArray(new SparseAnnotations[0]));
+ }
+
+ private void resetCommunicatorExpectingNoBroadcast(
+ Capture<ClusterMessage> bcast) {
+ bcast.reset();
+ reset(clusterCommunicator);
+ replay(clusterCommunicator);
+ }
+
+ private void resetCommunicatorExpectingSingleBroadcast(
+ Capture<ClusterMessage> bcast) {
+
+ bcast.reset();
+ reset(clusterCommunicator);
+ try {
+ expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+ } catch (IOException e) {
+ fail("Should never reach here");
+ }
+ replay(clusterCommunicator);
}
@Test
@@ -476,12 +710,19 @@
assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
+ Capture<ClusterMessage> bcast = new Capture<>();
+
+ resetCommunicatorExpectingSingleBroadcast(bcast);
+
DeviceEvent event = deviceStore.removeDevice(DID1);
assertEquals(DEVICE_REMOVED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(1, deviceStore.getDeviceCount());
assertEquals(0, deviceStore.getPorts(DID1).size());
+ verify(clusterCommunicator);
+ // TODO: verify broadcast message
+ assertTrue(bcast.hasCaptured());
// putBack Device, Port w/o annotation
putDevice(DID1, SW1);
@@ -563,34 +804,28 @@
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
}
- }
- private static final class TestClusterCommunicationService implements ClusterCommunicationService {
- @Override
- public boolean broadcast(ClusterMessage message) throws IOException { return true; }
- @Override
- public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
- @Override
- public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
- @Override
- public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
+ public <T> T deserialize(byte[] bytes) {
+ return SERIALIZER.decode(bytes);
+ }
}
private static final class TestClusterService implements ClusterService {
- private static final ControllerNode ONOS1 =
- new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
public TestClusterService() {
- nodes.put(new NodeId("N1"), ONOS1);
- nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
+ nodes.put(NID1, ONOS1);
+ nodeStates.put(NID1, ACTIVE);
+
+ nodes.put(NID2, ONOS2);
+ nodeStates.put(NID2, ACTIVE);
}
@Override
public ControllerNode getLocalNode() {
- return ONOS1;
+ return GossipDeviceStoreTest.ONOS1;
}
@Override
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java
new file mode 100644
index 0000000..4bcc0a3
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java
@@ -0,0 +1,49 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.util.KryoPool.FamilySerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+
+/**
+ * Creates {@link ImmutableList} serializer instance.
+ */
+public class ImmutableListSerializer extends FamilySerializer<ImmutableList<?>> {
+
+ /**
+ * Creates {@link ImmutableList} serializer instance.
+ */
+ public ImmutableListSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableList<?> object) {
+ output.writeInt(object.size());
+ for (Object e : object) {
+ kryo.writeClassAndObject(output, e);
+ }
+ }
+
+ @Override
+ public ImmutableList<?> read(Kryo kryo, Input input,
+ Class<ImmutableList<?>> type) {
+ final int size = input.readInt();
+ Builder<Object> builder = ImmutableList.builder();
+ for (int i = 0; i < size; ++i) {
+ builder.add(kryo.readClassAndObject(input));
+ }
+ return builder.build();
+ }
+
+ @Override
+ public void registerFamilies(Kryo kryo) {
+ kryo.register(ImmutableList.of(1).getClass(), this);
+ kryo.register(ImmutableList.of(1, 2).getClass(), this);
+ // TODO register required ImmutableList variants
+ }
+
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index efecb6c..f81a984 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -31,6 +31,9 @@
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
public final class KryoPoolUtil {
/**
@@ -47,12 +50,15 @@
*/
public static final KryoPool API = KryoPool.newBuilder()
.register(MISC)
+ .register(ImmutableMap.class, new ImmutableMapSerializer())
+ .register(ImmutableList.class, new ImmutableListSerializer())
.register(
//
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
//
+ //
ControllerNode.State.class,
Device.Type.class,
DefaultAnnotations.class,