Added support to device store for broadcasting device/port update events to peers
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 8316769..da0a292 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -4,6 +4,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate;
@@ -33,10 +34,15 @@
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -96,6 +102,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
@Activate
public void activate() {
log.info("Started");
@@ -133,8 +142,14 @@
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
if (event != null) {
- // FIXME: broadcast deltaDesc, UP
- log.debug("broadcast deltaDesc");
+ log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
}
return event;
}
@@ -298,19 +313,21 @@
List<PortDescription> portDescriptions) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
- for (PortDescription e : portDescriptions) {
- deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
- }
+ Timestamped<List<PortDescription>> timestampedPortDescriptions =
+ new Timestamped<>(portDescriptions, newTimestamp);
- List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
- new Timestamped<>(portDescriptions, newTimestamp));
+ List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
if (!events.isEmpty()) {
- // FIXME: broadcast deltaDesc, UP
- log.debug("broadcast deltaDesc");
+ log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a port update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
}
return events;
-
}
private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
@@ -437,8 +454,14 @@
final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
if (event != null) {
- // FIXME: broadcast deltaDesc
- log.debug("broadcast deltaDesc");
+ log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a port status update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
}
return event;
}
@@ -749,4 +772,61 @@
return portDescs.put(newOne.value().portNumber(), newOne);
}
}
+
+ private void notifyPeers(InternalDeviceEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-device-updates"), event);
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalPortEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-updates"), event);
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalPortStatusEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-status-updates"), event);
+ clusterCommunicator.broadcast(message);
+ }
+
+ private class InternalDeviceEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ log.info("Received device update event from peer: {}", message.sender());
+ InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+ createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
+ }
+ }
+
+ private class InternalPortEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received port update event from peer: {}", message.sender());
+ InternalPortEvent event = (InternalPortEvent) message.payload();
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
+
+ updatePortsInternal(providerId, deviceId, portDescriptions);
+ }
+ }
+
+ private class InternalPortStatusEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received port status update event from peer: {}", message.sender());
+ InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<PortDescription> portDescription = event.portDescription();
+
+ updatePortStatusInternal(providerId, deviceId, portDescription);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
new file mode 100644
index 0000000..2d97d25
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -0,0 +1,34 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+public class InternalDeviceEvent {
+
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final Timestamped<DeviceDescription> deviceDescription;
+
+ protected InternalDeviceEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<DeviceDescription> deviceDescription) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.deviceDescription = deviceDescription;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public Timestamped<DeviceDescription> deviceDescription() {
+ return deviceDescription;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
new file mode 100644
index 0000000..327c185
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+public class InternalPortEvent {
+
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final Timestamped<List<PortDescription>> portDescriptions;
+
+ protected InternalPortEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<List<PortDescription>> portDescriptions) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.portDescriptions = portDescriptions;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public Timestamped<List<PortDescription>> portDescriptions() {
+ return portDescriptions;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
new file mode 100644
index 0000000..822ba7a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -0,0 +1,34 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+public class InternalPortStatusEvent {
+
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final Timestamped<PortDescription> portDescription;
+
+ protected InternalPortStatusEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<PortDescription> portDescription) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.portDescription = portDescription;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public Timestamped<PortDescription> portDescription() {
+ return portDescription;
+ }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
index 94de9b2..361b071 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -5,6 +5,7 @@
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -37,6 +38,10 @@
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -105,7 +110,9 @@
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
- gossipDeviceStore = new TestGossipDeviceStore(clockService);
+ ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
+
+ gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterCommunicator);
gossipDeviceStore.activate();
deviceStore = gossipDeviceStore;
}
@@ -541,8 +548,20 @@
private static final class TestGossipDeviceStore extends GossipDeviceStore {
- public TestGossipDeviceStore(ClockService clockService) {
+ public TestGossipDeviceStore(ClockService clockService, ClusterCommunicationService clusterCommunicator) {
this.clockService = clockService;
+ this.clusterCommunicator = clusterCommunicator;
}
}
+
+ private static final class TestClusterCommunicationService implements ClusterCommunicationService {
+ @Override
+ public boolean broadcast(ClusterMessage message) throws IOException { return true; }
+ @Override
+ public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
+ @Override
+ public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
+ @Override
+ public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
+ }
}