Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index ac726c2..3f927fd 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -119,8 +119,10 @@
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
.register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
+ .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
.register(InternalPortEvent.class, new InternalPortEventSerializer())
.register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
+ .register(Timestamp.class)
.register(Timestamped.class)
.register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
.build()
@@ -134,6 +136,8 @@
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
+ clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
@@ -177,7 +181,7 @@
try {
notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
} catch (IOException e) {
- log.error("Failed to notify peers of a device update topology event or providerId: "
+ log.error("Failed to notify peers of a device update topology event for providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
@@ -280,7 +284,18 @@
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
Timestamp timestamp = clockService.getTimestamp(deviceId);
- return markOfflineInternal(deviceId, timestamp);
+ DeviceEvent event = markOfflineInternal(deviceId, timestamp);
+ if (event != null) {
+ log.info("Notifying peers of a device offline topology event for deviceId: {}",
+ deviceId);
+ try {
+ notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
+ deviceId);
+ }
+ }
+ return event;
}
private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
@@ -811,6 +826,14 @@
clusterCommunicator.broadcast(message);
}
+ private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
private void notifyPeers(InternalPortEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
@@ -830,15 +853,32 @@
private class InternalDeviceEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
+
log.info("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+
createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
}
}
+ private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received device offline event from peer: {}", message.sender());
+ InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
+
+ DeviceId deviceId = event.deviceId();
+ Timestamp timestamp = event.timestamp();
+
+ markOfflineInternal(deviceId, timestamp);
+ }
+ }
+
private class InternalPortEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
index 58fed70..4b59e3a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -3,13 +3,14 @@
import org.onlab.onos.store.cluster.messaging.MessageSubject;
/**
- * MessageSubjects used by GossipDeviceStore.
+ * MessageSubjects used by GossipDeviceStore peer-peer communication.
*/
public final class GossipDeviceStoreMessageSubjects {
private GossipDeviceStoreMessageSubjects() {}
public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
+ public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline");
public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
new file mode 100644
index 0000000..d8942d6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * going offline.
+ */
+public class InternalDeviceOfflineEvent {
+
+ private final DeviceId deviceId;
+ private final Timestamp timestamp;
+
+ /**
+ * Creates a InternalDeviceOfflineEvent.
+ * @param deviceId identifier of device going offline.
+ * @param timestamp timestamp of when the device went offline.
+ */
+ public InternalDeviceOfflineEvent(DeviceId deviceId, Timestamp timestamp) {
+ this.deviceId = deviceId;
+ this.timestamp = timestamp;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private InternalDeviceOfflineEvent() {
+ deviceId = null;
+ timestamp = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
new file mode 100644
index 0000000..7059636
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalDeviceOfflineEvent}.
+ */
+public class InternalDeviceOfflineEventSerializer extends Serializer<InternalDeviceOfflineEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalDeviceOfflineEvent}.
+ */
+ public InternalDeviceOfflineEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalDeviceOfflineEvent event) {
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.timestamp());
+ }
+
+ @Override
+ public InternalDeviceOfflineEvent read(Kryo kryo, Input input,
+ Class<InternalDeviceOfflineEvent> type) {
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamp timestamp = (Timestamp) kryo.readClassAndObject(input);
+
+ return new InternalDeviceOfflineEvent(deviceId, timestamp);
+ }
+}