Adding some tests for GossipDeviceStore + bugfix
Change-Id: Ic0d55fa499b1d66131f059b4a47cd105c55a6e63
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 6ab529a..12ecf74 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -1,5 +1,6 @@
package org.onlab.onos.store.device.impl;
+import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
@@ -118,7 +119,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
@@ -206,14 +207,19 @@
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) {
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+ final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
- DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+ final DeviceEvent event;
+ final Timestamped<DeviceDescription> mergedDesc;
+ synchronized (getDeviceDescriptions(deviceId)) {
+ event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+ mergedDesc = getDeviceDescriptions(deviceId).get(providerId).getDeviceDesc();
+ }
if (event != null) {
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
- notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+ notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a device update topology event for providerId: "
+ providerId + " and deviceId: " + deviceId, e);
@@ -317,8 +323,8 @@
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
- Timestamp timestamp = clockService.getTimestamp(deviceId);
- DeviceEvent event = markOfflineInternal(deviceId, timestamp);
+ final Timestamp timestamp = clockService.getTimestamp(deviceId);
+ final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
if (event != null) {
log.info("Notifying peers of a device offline topology event for deviceId: {}",
deviceId);
@@ -390,17 +396,33 @@
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
DeviceId deviceId,
List<PortDescription> portDescriptions) {
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- Timestamped<List<PortDescription>> timestampedPortDescriptions =
- new Timestamped<>(portDescriptions, newTimestamp);
+ final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
+ final Timestamped<List<PortDescription>> timestampedInput
+ = new Timestamped<>(portDescriptions, newTimestamp);
+ final List<DeviceEvent> events;
+ final Timestamped<List<PortDescription>> merged;
+
+ synchronized (getDeviceDescriptions(deviceId)) {
+ events = updatePortsInternal(providerId, deviceId, timestampedInput);
+ final DeviceDescriptions descs = getDeviceDescriptions(deviceId).get(providerId);
+ List<PortDescription> mergedList =
+ FluentIterable.from(portDescriptions)
+ .transform(new Function<PortDescription, PortDescription>() {
+ @Override
+ public PortDescription apply(PortDescription input) {
+ // lookup merged port description
+ return descs.getPortDesc(input.portNumber()).value();
+ }
+ }).toList();
+ merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
+ }
if (!events.isEmpty()) {
log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
- notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+ notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
} catch (IOException e) {
log.error("Failed to notify peers of a port update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
@@ -527,16 +549,25 @@
}
@Override
- public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
- PortDescription portDescription) {
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
- DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+ public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
+ DeviceId deviceId,
+ PortDescription portDescription) {
+
+ final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+ final Timestamped<PortDescription> deltaDesc
+ = new Timestamped<>(portDescription, newTimestamp);
+ final DeviceEvent event;
+ final Timestamped<PortDescription> mergedDesc;
+ synchronized (getDeviceDescriptions(deviceId)) {
+ event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+ mergedDesc = getDeviceDescriptions(deviceId).get(providerId)
+ .getPortDesc(portDescription.portNumber());
+ }
if (event != null) {
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
- notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+ notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a port status update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
@@ -684,7 +715,7 @@
* @return Device instance
*/
private Device composeDevice(DeviceId deviceId,
- ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+ Map<ProviderId, DeviceDescriptions> providerDescs) {
checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");