Added support to device store for broadcasting device/port update events to peers
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 8316769..da0a292 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -4,6 +4,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate;
@@ -33,10 +34,15 @@
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -96,6 +102,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
@Activate
public void activate() {
log.info("Started");
@@ -133,8 +142,14 @@
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
if (event != null) {
- // FIXME: broadcast deltaDesc, UP
- log.debug("broadcast deltaDesc");
+ log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
}
return event;
}
@@ -298,19 +313,21 @@
List<PortDescription> portDescriptions) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
- for (PortDescription e : portDescriptions) {
- deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
- }
+ Timestamped<List<PortDescription>> timestampedPortDescriptions =
+ new Timestamped<>(portDescriptions, newTimestamp);
- List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
- new Timestamped<>(portDescriptions, newTimestamp));
+ List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
if (!events.isEmpty()) {
- // FIXME: broadcast deltaDesc, UP
- log.debug("broadcast deltaDesc");
+ log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a port update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
}
return events;
-
}
private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
@@ -437,8 +454,14 @@
final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
if (event != null) {
- // FIXME: broadcast deltaDesc
- log.debug("broadcast deltaDesc");
+ log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a port status update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
}
return event;
}
@@ -749,4 +772,61 @@
return portDescs.put(newOne.value().portNumber(), newOne);
}
}
+
+ private void notifyPeers(InternalDeviceEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-device-updates"), event);
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalPortEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-updates"), event);
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalPortStatusEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-status-updates"), event);
+ clusterCommunicator.broadcast(message);
+ }
+
+ private class InternalDeviceEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ log.info("Received device update event from peer: {}", message.sender());
+ InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+ createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
+ }
+ }
+
+ private class InternalPortEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received port update event from peer: {}", message.sender());
+ InternalPortEvent event = (InternalPortEvent) message.payload();
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
+
+ updatePortsInternal(providerId, deviceId, portDescriptions);
+ }
+ }
+
+ private class InternalPortStatusEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received port status update event from peer: {}", message.sender());
+ InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<PortDescription> portDescription = event.portDescription();
+
+ updatePortStatusInternal(providerId, deviceId, portDescription);
+ }
+ }
}