Added support for replicating device removed topology events
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 3f927fd..f39413b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -120,6 +120,7 @@
                     .register(KryoPoolUtil.API)
                     .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
                     .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
+                    .register(InternalDeviceRemovedEvent.class)
                     .register(InternalPortEvent.class, new InternalPortEventSerializer())
                     .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
                     .register(Timestamp.class)
@@ -138,6 +139,8 @@
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
         clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
+        clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
@@ -583,7 +586,16 @@
     public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
         Timestamp timestamp = clockService.getTimestamp(deviceId);
         DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
-        // TODO: broadcast removal event
+        if (event != null) {
+            log.info("Notifying peers of a device removed topology event for deviceId: {}",
+                    deviceId);
+            try {
+                notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
+                     deviceId);
+            }
+        }
         return event;
     }
 
@@ -834,6 +846,14 @@
         clusterCommunicator.broadcast(message);
     }
 
+    private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
+
     private void notifyPeers(InternalPortEvent event) throws IOException {
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
@@ -879,6 +899,20 @@
         }
     }
 
+    private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received device removed event from peer: {}", message.sender());
+            InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
+
+            DeviceId deviceId = event.deviceId();
+            Timestamp timestamp = event.timestamp();
+
+            removeDeviceInternal(deviceId, timestamp);
+        }
+    }
+
     private class InternalPortEventListener implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {