Added support for replicating device removed topology events
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 3f927fd..f39413b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -120,6 +120,7 @@
.register(KryoPoolUtil.API)
.register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
.register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
+ .register(InternalDeviceRemovedEvent.class)
.register(InternalPortEvent.class, new InternalPortEventSerializer())
.register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
.register(Timestamp.class)
@@ -138,6 +139,8 @@
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
+ clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
@@ -583,7 +586,16 @@
public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Timestamp timestamp = clockService.getTimestamp(deviceId);
DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
- // TODO: broadcast removal event
+ if (event != null) {
+ log.info("Notifying peers of a device removed topology event for deviceId: {}",
+ deviceId);
+ try {
+ notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
+ deviceId);
+ }
+ }
return event;
}
@@ -834,6 +846,14 @@
clusterCommunicator.broadcast(message);
}
+ private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
private void notifyPeers(InternalPortEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
@@ -879,6 +899,20 @@
}
}
+ private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received device removed event from peer: {}", message.sender());
+ InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
+
+ DeviceId deviceId = event.deviceId();
+ Timestamp timestamp = event.timestamp();
+
+ removeDeviceInternal(deviceId, timestamp);
+ }
+ }
+
private class InternalPortEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
index 4b59e3a..5272182 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -11,6 +11,7 @@
public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline");
+ public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed");
public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
new file mode 100644
index 0000000..6c8b905
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * being administratively removed.
+ */
+public class InternalDeviceRemovedEvent {
+
+ private final DeviceId deviceId;
+ private final Timestamp timestamp;
+
+ /**
+ * Creates a InternalDeviceRemovedEvent.
+ * @param deviceId identifier of the removed device.
+ * @param timestamp timestamp of when the device was administratively removed.
+ */
+ public InternalDeviceRemovedEvent(DeviceId deviceId, Timestamp timestamp) {
+ this.deviceId = deviceId;
+ this.timestamp = timestamp;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private InternalDeviceRemovedEvent() {
+ deviceId = null;
+ timestamp = null;
+ }
+}