Added support for replicating device offline topology events
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index ac726c2..3f927fd 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -119,8 +119,10 @@
             serializerPool = KryoPool.newBuilder()
                     .register(KryoPoolUtil.API)
                     .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
+                    .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
                     .register(InternalPortEvent.class, new InternalPortEventSerializer())
                     .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
+                    .register(Timestamp.class)
                     .register(Timestamped.class)
                     .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
                     .build()
@@ -134,6 +136,8 @@
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
         clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
+        clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
@@ -177,7 +181,7 @@
             try {
                 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
             } catch (IOException e) {
-                log.error("Failed to notify peers of a device update topology event or providerId: "
+                log.error("Failed to notify peers of a device update topology event for providerId: "
                         + providerId + " and deviceId: " + deviceId, e);
             }
         }
@@ -280,7 +284,18 @@
     @Override
     public DeviceEvent markOffline(DeviceId deviceId) {
         Timestamp timestamp = clockService.getTimestamp(deviceId);
-        return markOfflineInternal(deviceId, timestamp);
+        DeviceEvent event = markOfflineInternal(deviceId, timestamp);
+        if (event != null) {
+            log.info("Notifying peers of a device offline topology event for deviceId: {}",
+                    deviceId);
+            try {
+                notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
+                     deviceId);
+            }
+        }
+        return event;
     }
 
     private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
@@ -811,6 +826,14 @@
         clusterCommunicator.broadcast(message);
     }
 
+    private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
+
     private void notifyPeers(InternalPortEvent event) throws IOException {
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
@@ -830,15 +853,32 @@
     private class InternalDeviceEventListener implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
+
             log.info("Received device update event from peer: {}", message.sender());
             InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+
             ProviderId providerId = event.providerId();
             DeviceId deviceId = event.deviceId();
             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+
             createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
         }
     }
 
+    private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received device offline event from peer: {}", message.sender());
+            InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
+
+            DeviceId deviceId = event.deviceId();
+            Timestamp timestamp = event.timestamp();
+
+            markOfflineInternal(deviceId, timestamp);
+        }
+    }
+
     private class InternalPortEventListener implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {