Added support to device store for broadcasting device/port update events to peers
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 8316769..da0a292 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -4,6 +4,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
 import org.apache.felix.scr.annotations.Activate;
@@ -33,10 +34,15 @@
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.ClockService;
 import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.common.impl.Timestamped;
 import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -96,6 +102,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClockService clockService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
     @Activate
     public void activate() {
         log.info("Started");
@@ -133,8 +142,14 @@
         final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
         DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
         if (event != null) {
-            // FIXME: broadcast deltaDesc, UP
-            log.debug("broadcast deltaDesc");
+            log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+                providerId, deviceId);
+            try {
+                notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a device update topology event or providerId: "
+                        + providerId + " and deviceId: " + deviceId, e);
+            }
         }
         return event;
     }
@@ -298,19 +313,21 @@
                                        List<PortDescription> portDescriptions) {
         Timestamp newTimestamp = clockService.getTimestamp(deviceId);
 
-        List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
-        for (PortDescription e : portDescriptions) {
-            deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
-        }
+        Timestamped<List<PortDescription>> timestampedPortDescriptions =
+            new Timestamped<>(portDescriptions, newTimestamp);
 
-        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
-                          new Timestamped<>(portDescriptions, newTimestamp));
+        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
         if (!events.isEmpty()) {
-            // FIXME: broadcast deltaDesc, UP
-            log.debug("broadcast deltaDesc");
+            log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
+                    providerId, deviceId);
+            try {
+                notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a port update topology event or providerId: "
+                    + providerId + " and deviceId: " + deviceId, e);
+            }
         }
         return events;
-
     }
 
     private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
@@ -437,8 +454,14 @@
         final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
         DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
         if (event != null) {
-            // FIXME: broadcast deltaDesc
-            log.debug("broadcast deltaDesc");
+            log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
+                        providerId, deviceId);
+            try {
+                notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a port status update topology event or providerId: "
+                        + providerId + " and deviceId: " + deviceId, e);
+            }
         }
         return event;
     }
@@ -749,4 +772,61 @@
             return portDescs.put(newOne.value().portNumber(), newOne);
         }
     }
+
+    private void notifyPeers(InternalDeviceEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-device-updates"), event);
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void notifyPeers(InternalPortEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-updates"), event);
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void notifyPeers(InternalPortStatusEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-status-updates"), event);
+        clusterCommunicator.broadcast(message);
+    }
+
+    private class InternalDeviceEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            log.info("Received device update event from peer: {}", message.sender());
+            InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+            createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
+        }
+    }
+
+    private class InternalPortEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received port update event from peer: {}", message.sender());
+            InternalPortEvent event = (InternalPortEvent) message.payload();
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
+
+            updatePortsInternal(providerId, deviceId, portDescriptions);
+        }
+    }
+
+    private class InternalPortStatusEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received port status update event from peer: {}", message.sender());
+            InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            Timestamped<PortDescription> portDescription = event.portDescription();
+
+            updatePortStatusInternal(providerId, deviceId, portDescription);
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
new file mode 100644
index 0000000..2d97d25
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -0,0 +1,34 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+public class InternalDeviceEvent {
+
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final Timestamped<DeviceDescription> deviceDescription;
+
+    protected InternalDeviceEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            Timestamped<DeviceDescription> deviceDescription) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.deviceDescription = deviceDescription;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public Timestamped<DeviceDescription> deviceDescription() {
+        return deviceDescription;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
new file mode 100644
index 0000000..327c185
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+public class InternalPortEvent {
+
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final Timestamped<List<PortDescription>> portDescriptions;
+
+    protected InternalPortEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            Timestamped<List<PortDescription>> portDescriptions) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.portDescriptions = portDescriptions;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public Timestamped<List<PortDescription>> portDescriptions() {
+        return portDescriptions;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
new file mode 100644
index 0000000..822ba7a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -0,0 +1,34 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+public class InternalPortStatusEvent {
+
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final Timestamped<PortDescription> portDescription;
+
+    protected InternalPortStatusEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            Timestamped<PortDescription> portDescription) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.portDescription = portDescription;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public Timestamped<PortDescription> portDescription() {
+        return portDescription;
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
index 94de9b2..361b071 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -5,6 +5,7 @@
 import static org.onlab.onos.net.DeviceId.deviceId;
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -37,6 +38,10 @@
 import org.onlab.onos.net.device.PortDescription;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -105,7 +110,9 @@
         deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
         deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
 
-        gossipDeviceStore = new TestGossipDeviceStore(clockService);
+        ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
+
+        gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterCommunicator);
         gossipDeviceStore.activate();
         deviceStore = gossipDeviceStore;
     }
@@ -541,8 +548,20 @@
 
     private static final class TestGossipDeviceStore extends GossipDeviceStore {
 
-        public TestGossipDeviceStore(ClockService clockService) {
+        public TestGossipDeviceStore(ClockService clockService, ClusterCommunicationService clusterCommunicator) {
             this.clockService = clockService;
+            this.clusterCommunicator = clusterCommunicator;
         }
     }
+
+    private static final class TestClusterCommunicationService implements ClusterCommunicationService {
+        @Override
+        public boolean broadcast(ClusterMessage message) throws IOException { return true; }
+        @Override
+        public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
+        @Override
+        public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
+        @Override
+        public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
+    }
 }