Merge remote-tracking branch 'origin/master'
diff --git a/core/net/src/test/java/org/onlab/onos/net/host/impl/HostMonitorTest.java b/core/net/src/test/java/org/onlab/onos/net/host/impl/HostMonitorTest.java
index d766251..14d72eb 100644
--- a/core/net/src/test/java/org/onlab/onos/net/host/impl/HostMonitorTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/host/impl/HostMonitorTest.java
@@ -13,6 +13,7 @@
 import java.util.List;
 import java.util.Set;
 
+import org.junit.Ignore;
 import org.junit.Test;
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.Device;
@@ -81,6 +82,7 @@
     }
 
     @Test
+    @Ignore
     public void testMonitorHostDoesNotExist() throws Exception {
         HostManager hostManager = createMock(HostManager.class);
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
new file mode 100644
index 0000000..74c22f1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
@@ -0,0 +1,10 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+public final class ClusterManagementMessageSubjects {
+    // avoid instantiation
+    private ClusterManagementMessageSubjects() {}
+
+    public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
similarity index 91%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
index 961ed4f..30b847f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging.impl;
+package org.onlab.onos.store.cluster.impl;
 
 import org.onlab.onos.cluster.ControllerNode;
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
similarity index 69%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
index 1f5fd3f..cdfd145 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging.impl;
+package org.onlab.onos.store.cluster.impl;
 
 public enum ClusterMembershipEventType {
     NEW_MEMBER,
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 2e8937c..b260e1c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -17,6 +17,8 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
 import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java
deleted file mode 100644
index 11f6228..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-
-public final class ClusterMessageSubjects {
-    // avoid instantiation
-    private ClusterMessageSubjects() {}
-
-    public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
similarity index 96%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
index 10368aa..dc3b968 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.impl;
+package org.onlab.onos.store.cluster.messaging.impl;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
similarity index 95%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
index b70da73..132f27a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.common.impl;
 
 import java.util.Map;
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
similarity index 97%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
index 095752b..033a1de 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.common.impl;
 
 import java.util.Map;
 import java.util.Set;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
index 301884c..d05659b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
@@ -8,7 +8,7 @@
 import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
+import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
 
 // TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
 // TODO: Handle Port as part of these messages, or separate messages for Ports?
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
index 011713e..e7a4d0a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
@@ -10,7 +10,7 @@
 import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
+import org.onlab.onos.store.common.impl.AntiEntropyReply;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 8316769..da0a292 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -4,6 +4,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
 import org.apache.felix.scr.annotations.Activate;
@@ -33,10 +34,15 @@
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.ClockService;
 import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.common.impl.Timestamped;
 import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -96,6 +102,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClockService clockService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
     @Activate
     public void activate() {
         log.info("Started");
@@ -133,8 +142,14 @@
         final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
         DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
         if (event != null) {
-            // FIXME: broadcast deltaDesc, UP
-            log.debug("broadcast deltaDesc");
+            log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+                providerId, deviceId);
+            try {
+                notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a device update topology event or providerId: "
+                        + providerId + " and deviceId: " + deviceId, e);
+            }
         }
         return event;
     }
@@ -298,19 +313,21 @@
                                        List<PortDescription> portDescriptions) {
         Timestamp newTimestamp = clockService.getTimestamp(deviceId);
 
-        List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
-        for (PortDescription e : portDescriptions) {
-            deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
-        }
+        Timestamped<List<PortDescription>> timestampedPortDescriptions =
+            new Timestamped<>(portDescriptions, newTimestamp);
 
-        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
-                          new Timestamped<>(portDescriptions, newTimestamp));
+        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
         if (!events.isEmpty()) {
-            // FIXME: broadcast deltaDesc, UP
-            log.debug("broadcast deltaDesc");
+            log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
+                    providerId, deviceId);
+            try {
+                notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a port update topology event or providerId: "
+                    + providerId + " and deviceId: " + deviceId, e);
+            }
         }
         return events;
-
     }
 
     private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
@@ -437,8 +454,14 @@
         final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
         DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
         if (event != null) {
-            // FIXME: broadcast deltaDesc
-            log.debug("broadcast deltaDesc");
+            log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
+                        providerId, deviceId);
+            try {
+                notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a port status update topology event or providerId: "
+                        + providerId + " and deviceId: " + deviceId, e);
+            }
         }
         return event;
     }
@@ -749,4 +772,61 @@
             return portDescs.put(newOne.value().portNumber(), newOne);
         }
     }
+
+    private void notifyPeers(InternalDeviceEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-device-updates"), event);
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void notifyPeers(InternalPortEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-updates"), event);
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void notifyPeers(InternalPortStatusEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-status-updates"), event);
+        clusterCommunicator.broadcast(message);
+    }
+
+    private class InternalDeviceEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            log.info("Received device update event from peer: {}", message.sender());
+            InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+            createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
+        }
+    }
+
+    private class InternalPortEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received port update event from peer: {}", message.sender());
+            InternalPortEvent event = (InternalPortEvent) message.payload();
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
+
+            updatePortsInternal(providerId, deviceId, portDescriptions);
+        }
+    }
+
+    private class InternalPortStatusEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received port status update event from peer: {}", message.sender());
+            InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            Timestamped<PortDescription> portDescription = event.portDescription();
+
+            updatePortStatusInternal(providerId, deviceId, portDescription);
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
new file mode 100644
index 0000000..2d97d25
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -0,0 +1,34 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+public class InternalDeviceEvent {
+
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final Timestamped<DeviceDescription> deviceDescription;
+
+    protected InternalDeviceEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            Timestamped<DeviceDescription> deviceDescription) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.deviceDescription = deviceDescription;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public Timestamped<DeviceDescription> deviceDescription() {
+        return deviceDescription;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
new file mode 100644
index 0000000..327c185
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+public class InternalPortEvent {
+
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final Timestamped<List<PortDescription>> portDescriptions;
+
+    protected InternalPortEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            Timestamped<List<PortDescription>> portDescriptions) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.portDescriptions = portDescriptions;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public Timestamped<List<PortDescription>> portDescriptions() {
+        return portDescriptions;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
new file mode 100644
index 0000000..822ba7a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -0,0 +1,34 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+public class InternalPortStatusEvent {
+
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final Timestamped<PortDescription> portDescription;
+
+    protected InternalPortStatusEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            Timestamped<PortDescription> portDescription) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.portDescription = portDescription;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public Timestamped<PortDescription> portDescription() {
+        return portDescription;
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
index bba12f2..e63fcaa 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
@@ -7,6 +7,7 @@
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
+import org.onlab.onos.store.cluster.messaging.impl.MessageSerializer;
 import org.onlab.netty.NettyMessagingService;
 import org.onlab.packet.IpPrefix;
 
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
index 94de9b2..361b071 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -5,6 +5,7 @@
 import static org.onlab.onos.net.DeviceId.deviceId;
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -37,6 +38,10 @@
 import org.onlab.onos.net.device.PortDescription;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -105,7 +110,9 @@
         deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
         deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
 
-        gossipDeviceStore = new TestGossipDeviceStore(clockService);
+        ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
+
+        gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterCommunicator);
         gossipDeviceStore.activate();
         deviceStore = gossipDeviceStore;
     }
@@ -541,8 +548,20 @@
 
     private static final class TestGossipDeviceStore extends GossipDeviceStore {
 
-        public TestGossipDeviceStore(ClockService clockService) {
+        public TestGossipDeviceStore(ClockService clockService, ClusterCommunicationService clusterCommunicator) {
             this.clockService = clockService;
+            this.clusterCommunicator = clusterCommunicator;
         }
     }
+
+    private static final class TestClusterCommunicationService implements ClusterCommunicationService {
+        @Override
+        public boolean broadcast(ClusterMessage message) throws IOException { return true; }
+        @Override
+        public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
+        @Override
+        public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
+        @Override
+        public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
+    }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index c07d289..6df0b23 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -45,12 +45,12 @@
     }
 
     @Override
-    public <T> T deserialize(ByteBuffer buffer) {
+    public <T> T decode(ByteBuffer buffer) {
         return serializerPool.deserialize(buffer);
     }
 
     @Override
-    public void serialize(Object obj, ByteBuffer buffer) {
+    public void encode(Object obj, ByteBuffer buffer) {
         serializerPool.serialize(obj, buffer);
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 3ed3216..a0d34a5 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -48,7 +48,7 @@
             checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
             checkpoint(DecoderState.READ_CONTENT);
         case READ_CONTENT:
-            InternalMessage message = serializer.deserialize(buffer.readBytes(contentLength).nioBuffer());
+            InternalMessage message = serializer.decode(buffer.readBytes(contentLength).nioBuffer());
             message.setMessagingService(messagingService);
             out.add(message);
             checkpoint(DecoderState.READ_HEADER_VERSION);
diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
index 56494b2..46550d4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Serializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
@@ -24,20 +24,18 @@
     public byte[] encode(Object data);
 
     /**
-     * Serializes the specified object into bytes using one of the
-     * pre-registered serializers.
+     * Encodes the specified POJO into a byte buffer.
      *
-     * @param obj object to be serialized
+     * @param data POJO to be encoded
      * @param buffer to write serialized bytes
      */
-    public void serialize(final Object obj, ByteBuffer buffer);
+    public void encode(final Object data, ByteBuffer buffer);
 
     /**
-     * Deserializes the specified bytes into an object using one of the
-     * pre-registered serializers.
+     * Decodes the specified byte buffer to a POJO.
      *
-     * @param buffer bytes to be deserialized
-     * @return deserialized object
+     * @param buffer bytes to be decoded
+     * @return POJO
      */
-    public <T> T deserialize(final ByteBuffer buffer);
+    public <T> T decode(final ByteBuffer buffer);
 }