Adding some tests for GossipDeviceStore + bugfix

Change-Id: Ic0d55fa499b1d66131f059b4a47cd105c55a6e63
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 7b00833..33517c7 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -62,6 +62,11 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+        <dependency>
+          <groupId>org.easymock</groupId>
+          <artifactId>easymock</artifactId>
+          <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
index f7fd7bc..03c293a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
@@ -58,7 +58,7 @@
      *
      * @param newDesc new DeviceDescription
      */
-    public synchronized void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
+    public void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
         Timestamped<DeviceDescription> oldOne = deviceDesc;
         Timestamped<DeviceDescription> newOne = newDesc;
         if (oldOne != null) {
@@ -76,7 +76,7 @@
      *
      * @param newDesc new PortDescription
      */
-    public synchronized void putPortDesc(Timestamped<PortDescription> newDesc) {
+    public void putPortDesc(Timestamped<PortDescription> newDesc) {
         Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
         Timestamped<PortDescription> newOne = newDesc;
         if (oldOne != null) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 6ab529a..12ecf74 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -1,5 +1,6 @@
 package org.onlab.onos.store.device.impl;
 
+import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
@@ -118,7 +119,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
             serializerPool = KryoPool.newBuilder()
@@ -206,14 +207,19 @@
     public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
                                      DeviceId deviceId,
                                      DeviceDescription deviceDescription) {
-        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+        final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
         final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
-        DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+        final DeviceEvent event;
+        final Timestamped<DeviceDescription> mergedDesc;
+        synchronized (getDeviceDescriptions(deviceId)) {
+            event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+            mergedDesc = getDeviceDescriptions(deviceId).get(providerId).getDeviceDesc();
+        }
         if (event != null) {
             log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
                 providerId, deviceId);
             try {
-                notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+                notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
             } catch (IOException e) {
                 log.error("Failed to notify peers of a device update topology event for providerId: "
                         + providerId + " and deviceId: " + deviceId, e);
@@ -317,8 +323,8 @@
 
     @Override
     public DeviceEvent markOffline(DeviceId deviceId) {
-        Timestamp timestamp = clockService.getTimestamp(deviceId);
-        DeviceEvent event = markOfflineInternal(deviceId, timestamp);
+        final Timestamp timestamp = clockService.getTimestamp(deviceId);
+        final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
         if (event != null) {
             log.info("Notifying peers of a device offline topology event for deviceId: {}",
                     deviceId);
@@ -390,17 +396,33 @@
     public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
                                        DeviceId deviceId,
                                        List<PortDescription> portDescriptions) {
-        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
 
-        Timestamped<List<PortDescription>> timestampedPortDescriptions =
-            new Timestamped<>(portDescriptions, newTimestamp);
+        final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
 
-        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
+        final Timestamped<List<PortDescription>> timestampedInput
+                = new Timestamped<>(portDescriptions, newTimestamp);
+        final List<DeviceEvent> events;
+        final Timestamped<List<PortDescription>> merged;
+
+        synchronized (getDeviceDescriptions(deviceId)) {
+            events = updatePortsInternal(providerId, deviceId, timestampedInput);
+            final DeviceDescriptions descs = getDeviceDescriptions(deviceId).get(providerId);
+            List<PortDescription> mergedList =
+                    FluentIterable.from(portDescriptions)
+                .transform(new Function<PortDescription, PortDescription>() {
+                    @Override
+                    public PortDescription apply(PortDescription input) {
+                        // lookup merged port description
+                        return descs.getPortDesc(input.portNumber()).value();
+                    }
+                }).toList();
+            merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
+        }
         if (!events.isEmpty()) {
             log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
                     providerId, deviceId);
             try {
-                notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+                notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
             } catch (IOException e) {
                 log.error("Failed to notify peers of a port update topology event or providerId: "
                     + providerId + " and deviceId: " + deviceId, e);
@@ -527,16 +549,25 @@
     }
 
     @Override
-    public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
-            PortDescription portDescription) {
-        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
-        final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
-        DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+    public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
+                                                     DeviceId deviceId,
+                                                     PortDescription portDescription) {
+
+        final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+        final Timestamped<PortDescription> deltaDesc
+            = new Timestamped<>(portDescription, newTimestamp);
+        final DeviceEvent event;
+        final Timestamped<PortDescription> mergedDesc;
+        synchronized (getDeviceDescriptions(deviceId)) {
+            event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+            mergedDesc = getDeviceDescriptions(deviceId).get(providerId)
+                            .getPortDesc(portDescription.portNumber());
+        }
         if (event != null) {
             log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
                         providerId, deviceId);
             try {
-                notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+                notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
             } catch (IOException e) {
                 log.error("Failed to notify peers of a port status update topology event or providerId: "
                         + providerId + " and deviceId: " + deviceId, e);
@@ -684,7 +715,7 @@
      * @return Device instance
      */
     private Device composeDevice(DeviceId deviceId,
-            ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+            Map<ProviderId, DeviceDescriptions> providerDescs) {
 
         checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
 
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
index fa42a6b..3c83169 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -1,12 +1,16 @@
 package org.onlab.onos.store.device.impl;
 
+import static org.easymock.EasyMock.*;
 import static org.junit.Assert.*;
 import static org.onlab.onos.net.Device.Type.SWITCH;
 import static org.onlab.onos.net.DeviceId.deviceId;
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
-
+import static org.onlab.onos.cluster.ControllerNode.State.*;
+import static org.onlab.onos.net.DefaultAnnotations.union;
+import static java.util.Arrays.asList;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -14,6 +18,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.easymock.Capture;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -90,14 +95,25 @@
             .set("B4", "b4")
             .build();
 
-    private static final NodeId MYSELF = new NodeId("myself");
+    // local node
+    private static final NodeId NID1 = new NodeId("local");
+    private static final ControllerNode ONOS1 =
+            new DefaultControllerNode(NID1, IpPrefix.valueOf("127.0.0.1"));
 
+    // remote node
+    private static final NodeId NID2 = new NodeId("remote");
+    private static final ControllerNode ONOS2 =
+            new DefaultControllerNode(NID2, IpPrefix.valueOf("127.0.0.2"));
+    private static final List<SparseAnnotations> NO_ANNOTATION = Collections.<SparseAnnotations>emptyList();
+
+
+    private TestGossipDeviceStore testGossipDeviceStore;
     private GossipDeviceStore gossipDeviceStore;
     private DeviceStore deviceStore;
 
     private DeviceClockManager deviceClockManager;
     private ClockService clockService;
-
+    private ClusterCommunicationService clusterCommunicator;
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
     }
@@ -113,15 +129,22 @@
         deviceClockManager.activate();
         clockService = deviceClockManager;
 
-        deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
-        deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
+        deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(NID1, 1));
+        deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(NID1, 2));
 
-        ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
+        clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
+        clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
+                                    anyObject(ClusterMessageHandler.class));
+        expectLastCall().anyTimes();
+        replay(clusterCommunicator);
         ClusterService clusterService = new TestClusterService();
 
-        gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
+        testGossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
+        gossipDeviceStore = testGossipDeviceStore;
         gossipDeviceStore.activate();
         deviceStore = gossipDeviceStore;
+        verify(clusterCommunicator);
+        reset(clusterCommunicator);
     }
 
     @After
@@ -135,7 +158,16 @@
         DeviceDescription description =
                 new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
                         HW, swVersion, SN, annotations);
+        reset(clusterCommunicator);
+        try {
+            expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
+                .andReturn(true).anyTimes();
+        } catch (IOException e) {
+            fail("Should never reach here");
+        }
+        replay(clusterCommunicator);
         deviceStore.createOrUpdateDevice(PID, deviceId, description);
+        verify(clusterCommunicator);
     }
 
     private void putDeviceAncillary(DeviceId deviceId, String swVersion,
@@ -163,9 +195,9 @@
      * @param annotations
      */
     private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
-        DefaultAnnotations expected = DefaultAnnotations.builder().build();
+        SparseAnnotations expected = DefaultAnnotations.builder().build();
         for (SparseAnnotations a : annotations) {
-            expected = DefaultAnnotations.merge(expected, a);
+            expected = DefaultAnnotations.union(expected, a);
         }
         assertEquals(expected.keys(), actual.keys());
         for (String key : expected.keys()) {
@@ -173,6 +205,36 @@
         }
     }
 
+    private static void assertDeviceDescriptionEquals(DeviceDescription expected,
+                                                DeviceDescription actual) {
+        if (expected == actual) {
+            return;
+        }
+        assertEquals(expected.deviceURI(), actual.deviceURI());
+        assertEquals(expected.hwVersion(), actual.hwVersion());
+        assertEquals(expected.manufacturer(), actual.manufacturer());
+        assertEquals(expected.serialNumber(), actual.serialNumber());
+        assertEquals(expected.swVersion(), actual.swVersion());
+
+        assertAnnotationsEquals(actual.annotations(), expected.annotations());
+    }
+
+    private static void assertDeviceDescriptionEquals(DeviceDescription expected,
+            List<SparseAnnotations> expectedAnnotations,
+            DeviceDescription actual) {
+        if (expected == actual) {
+            return;
+        }
+        assertEquals(expected.deviceURI(), actual.deviceURI());
+        assertEquals(expected.hwVersion(), actual.hwVersion());
+        assertEquals(expected.manufacturer(), actual.manufacturer());
+        assertEquals(expected.serialNumber(), actual.serialNumber());
+        assertEquals(expected.swVersion(), actual.swVersion());
+
+        assertAnnotationsEquals(actual.annotations(),
+                expectedAnnotations.toArray(new SparseAnnotations[0]));
+    }
+
     @Test
     public final void testGetDeviceCount() {
         assertEquals("initialy empty", 0, deviceStore.getDeviceCount());
@@ -215,56 +277,123 @@
         assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2));
     }
 
+    private void assertInternalDeviceEvent(NodeId sender,
+                                           DeviceId deviceId,
+                                           ProviderId providerId,
+                                           DeviceDescription expectedDesc,
+                                           Capture<ClusterMessage> actualMsg) {
+        assertTrue(actualMsg.hasCaptured());
+        assertEquals(sender, actualMsg.getValue().sender());
+        assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+                     actualMsg.getValue().subject());
+        InternalDeviceEvent addEvent
+            = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
+        assertEquals(deviceId, addEvent.deviceId());
+        assertEquals(providerId, addEvent.providerId());
+        assertDeviceDescriptionEquals(expectedDesc, addEvent.deviceDescription().value());
+    }
+
+    private void assertInternalDeviceEvent(NodeId sender,
+                                           DeviceId deviceId,
+                                           ProviderId providerId,
+                                           DeviceDescription expectedDesc,
+                                           List<SparseAnnotations> expectedAnnotations,
+                                           Capture<ClusterMessage> actualMsg) {
+        assertTrue(actualMsg.hasCaptured());
+        assertEquals(sender, actualMsg.getValue().sender());
+        assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+                actualMsg.getValue().subject());
+        InternalDeviceEvent addEvent
+            = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
+        assertEquals(deviceId, addEvent.deviceId());
+        assertEquals(providerId, addEvent.providerId());
+        assertDeviceDescriptionEquals(expectedDesc, expectedAnnotations, addEvent.deviceDescription().value());
+    }
+
     @Test
-    public final void testCreateOrUpdateDevice() {
+    public final void testCreateOrUpdateDevice() throws IOException {
         DeviceDescription description =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW1, SN);
+        Capture<ClusterMessage> bcast = new Capture<>();
+
+        resetCommunicatorExpectingSingleBroadcast(bcast);
         DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
         assertEquals(DEVICE_ADDED, event.type());
         assertDevice(DID1, SW1, event.subject());
+        verify(clusterCommunicator);
+        assertInternalDeviceEvent(NID1, DID1, PID, description, bcast);
+
 
         DeviceDescription description2 =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW2, SN);
+        resetCommunicatorExpectingSingleBroadcast(bcast);
         DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
         assertEquals(DEVICE_UPDATED, event2.type());
         assertDevice(DID1, SW2, event2.subject());
 
+        verify(clusterCommunicator);
+        assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+        reset(clusterCommunicator);
+
         assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
     }
 
     @Test
-    public final void testCreateOrUpdateDeviceAncillary() {
+    public final void testCreateOrUpdateDeviceAncillary() throws IOException {
+        // add
         DeviceDescription description =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW1, SN, A2);
+        Capture<ClusterMessage> bcast = new Capture<>();
+
+        resetCommunicatorExpectingSingleBroadcast(bcast);
         DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
         assertEquals(DEVICE_ADDED, event.type());
         assertDevice(DID1, SW1, event.subject());
         assertEquals(PIDA, event.subject().providerId());
         assertAnnotationsEquals(event.subject().annotations(), A2);
         assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
+        verify(clusterCommunicator);
+        assertInternalDeviceEvent(NID1, DID1, PIDA, description, bcast);
 
+        // update from primary
         DeviceDescription description2 =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW2, SN, A1);
+        resetCommunicatorExpectingSingleBroadcast(bcast);
+
         DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
         assertEquals(DEVICE_UPDATED, event2.type());
         assertDevice(DID1, SW2, event2.subject());
         assertEquals(PID, event2.subject().providerId());
         assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
         assertTrue(deviceStore.isAvailable(DID1));
+        verify(clusterCommunicator);
+        assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
 
+        // no-op update from primary
+        resetCommunicatorExpectingNoBroadcast(bcast);
         assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
 
+        verify(clusterCommunicator);
+        assertFalse("no broadcast expected", bcast.hasCaptured());
+
         // For now, Ancillary is ignored once primary appears
+        resetCommunicatorExpectingNoBroadcast(bcast);
+
         assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
 
+        verify(clusterCommunicator);
+        assertFalse("no broadcast expected", bcast.hasCaptured());
+
         // But, Ancillary annotations will be in effect
         DeviceDescription description3 =
                 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
                         HW, SW1, SN, A2_2);
+        resetCommunicatorExpectingSingleBroadcast(bcast);
+
         DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
         assertEquals(DEVICE_UPDATED, event3.type());
         // basic information will be the one from Primary
@@ -273,6 +402,11 @@
         // but annotation from Ancillary will be merged
         assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2);
         assertTrue(deviceStore.isAvailable(DID1));
+        verify(clusterCommunicator);
+        // note: only annotation from PIDA is sent over the wire
+        assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
+                                  asList(union(A2, A2_2)), bcast);
+
     }
 
 
@@ -282,14 +416,24 @@
         putDevice(DID1, SW1);
         assertTrue(deviceStore.isAvailable(DID1));
 
+        Capture<ClusterMessage> bcast = new Capture<>();
+
+        resetCommunicatorExpectingSingleBroadcast(bcast);
         DeviceEvent event = deviceStore.markOffline(DID1);
         assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
         assertDevice(DID1, SW1, event.subject());
         assertFalse(deviceStore.isAvailable(DID1));
+        verify(clusterCommunicator);
+        // TODO: verify broadcast message
+        assertTrue(bcast.hasCaptured());
 
+
+        resetCommunicatorExpectingNoBroadcast(bcast);
         DeviceEvent event2 = deviceStore.markOffline(DID1);
         assertNull("No change, no event", event2);
-}
+        verify(clusterCommunicator);
+        assertFalse(bcast.hasCaptured());
+    }
 
     @Test
     public final void testUpdatePorts() {
@@ -298,8 +442,13 @@
                 new DefaultPortDescription(P1, true),
                 new DefaultPortDescription(P2, true)
                 );
+        Capture<ClusterMessage> bcast = new Capture<>();
 
+        resetCommunicatorExpectingSingleBroadcast(bcast);
         List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
+        verify(clusterCommunicator);
+        // TODO: verify broadcast message
+        assertTrue(bcast.hasCaptured());
 
         Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
         for (DeviceEvent event : events) {
@@ -318,7 +467,12 @@
                 new DefaultPortDescription(P3, true)
                 );
 
+        resetCommunicatorExpectingSingleBroadcast(bcast);
         events = deviceStore.updatePorts(PID, DID1, pds2);
+        verify(clusterCommunicator);
+        // TODO: verify broadcast message
+        assertTrue(bcast.hasCaptured());
+
         assertFalse("event should be triggered", events.isEmpty());
         for (DeviceEvent event : events) {
             PortNumber num = event.port().number();
@@ -341,7 +495,12 @@
                 new DefaultPortDescription(P1, false),
                 new DefaultPortDescription(P2, true)
                 );
+        resetCommunicatorExpectingSingleBroadcast(bcast);
         events = deviceStore.updatePorts(PID, DID1, pds3);
+        verify(clusterCommunicator);
+        // TODO: verify broadcast message
+        assertTrue(bcast.hasCaptured());
+
         assertFalse("event should be triggered", events.isEmpty());
         for (DeviceEvent event : events) {
             PortNumber num = event.port().number();
@@ -357,7 +516,6 @@
                 fail("Unknown port number encountered: " + num);
             }
         }
-
     }
 
     @Test
@@ -368,16 +526,22 @@
                 );
         deviceStore.updatePorts(PID, DID1, pds);
 
-        DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
-                new DefaultPortDescription(P1, false));
+        Capture<ClusterMessage> bcast = new Capture<>();
+
+        resetCommunicatorExpectingSingleBroadcast(bcast);
+        final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
+        DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
         assertEquals(PORT_UPDATED, event.type());
         assertDevice(DID1, SW1, event.subject());
         assertEquals(P1, event.port().number());
         assertFalse("Port is disabled", event.port().isEnabled());
-
+        verify(clusterCommunicator);
+        assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, bcast);
+        assertTrue(bcast.hasCaptured());
     }
+
     @Test
-    public final void testUpdatePortStatusAncillary() {
+    public final void testUpdatePortStatusAncillary() throws IOException {
         putDeviceAncillary(DID1, SW1);
         putDevice(DID1, SW1);
         List<PortDescription> pds = Arrays.<PortDescription>asList(
@@ -385,36 +549,106 @@
                 );
         deviceStore.updatePorts(PID, DID1, pds);
 
-        DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
-                new DefaultPortDescription(P1, false, A1_2));
+        Capture<ClusterMessage> bcast = new Capture<>();
+
+
+        // update port from primary
+        resetCommunicatorExpectingSingleBroadcast(bcast);
+        final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
+        DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
         assertEquals(PORT_UPDATED, event.type());
         assertDevice(DID1, SW1, event.subject());
         assertEquals(P1, event.port().number());
         assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
         assertFalse("Port is disabled", event.port().isEnabled());
+        verify(clusterCommunicator);
+        assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), bcast);
+        assertTrue(bcast.hasCaptured());
 
-        DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
-                new DefaultPortDescription(P1, true));
+        // update port from ancillary with no attributes
+        resetCommunicatorExpectingNoBroadcast(bcast);
+        final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
+        DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
         assertNull("Ancillary is ignored if primary exists", event2);
+        verify(clusterCommunicator);
+        assertFalse(bcast.hasCaptured());
 
         // but, Ancillary annotation update will be notified
-        DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
-                new DefaultPortDescription(P1, true, A2));
+        resetCommunicatorExpectingSingleBroadcast(bcast);
+        final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
+        DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
         assertEquals(PORT_UPDATED, event3.type());
         assertDevice(DID1, SW1, event3.subject());
         assertEquals(P1, event3.port().number());
         assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
         assertFalse("Port is disabled", event3.port().isEnabled());
+        verify(clusterCommunicator);
+        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), bcast);
+        assertTrue(bcast.hasCaptured());
 
         // port only reported from Ancillary will be notified as down
-        DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1,
-                new DefaultPortDescription(P2, true));
+        resetCommunicatorExpectingSingleBroadcast(bcast);
+        final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
+        DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
         assertEquals(PORT_ADDED, event4.type());
         assertDevice(DID1, SW1, event4.subject());
         assertEquals(P2, event4.port().number());
         assertAnnotationsEquals(event4.port().annotations());
         assertFalse("Port is disabled if not given from primary provider",
                         event4.port().isEnabled());
+        verify(clusterCommunicator);
+        // TODO: verify broadcast message content
+        assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, bcast);
+        assertTrue(bcast.hasCaptured());
+    }
+
+    private void assertInternalPortStatusEvent(NodeId sender, DeviceId did,
+            ProviderId pid, DefaultPortDescription expectedDesc,
+            List<SparseAnnotations> expectedAnnotations, Capture<ClusterMessage> actualMsg) {
+
+        assertTrue(actualMsg.hasCaptured());
+        assertEquals(sender, actualMsg.getValue().sender());
+        assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+                actualMsg.getValue().subject());
+        InternalPortStatusEvent addEvent
+            = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
+        assertEquals(did, addEvent.deviceId());
+        assertEquals(pid, addEvent.providerId());
+        assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
+                addEvent.portDescription().value());
+
+    }
+
+    private void assertPortDescriptionEquals(
+                                    PortDescription expectedDesc,
+                                    List<SparseAnnotations> expectedAnnotations,
+                                    PortDescription actual) {
+
+        assertEquals(expectedDesc.portNumber(), actual.portNumber());
+        assertEquals(expectedDesc.isEnabled(), actual.isEnabled());
+
+        assertAnnotationsEquals(actual.annotations(),
+                         expectedAnnotations.toArray(new SparseAnnotations[0]));
+    }
+
+    private void resetCommunicatorExpectingNoBroadcast(
+            Capture<ClusterMessage> bcast) {
+        bcast.reset();
+        reset(clusterCommunicator);
+        replay(clusterCommunicator);
+    }
+
+    private void resetCommunicatorExpectingSingleBroadcast(
+            Capture<ClusterMessage> bcast) {
+
+        bcast.reset();
+        reset(clusterCommunicator);
+        try {
+            expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+        } catch (IOException e) {
+            fail("Should never reach here");
+        }
+        replay(clusterCommunicator);
     }
 
     @Test
@@ -476,12 +710,19 @@
         assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
         assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
 
+        Capture<ClusterMessage> bcast = new Capture<>();
+
+        resetCommunicatorExpectingSingleBroadcast(bcast);
+
         DeviceEvent event = deviceStore.removeDevice(DID1);
         assertEquals(DEVICE_REMOVED, event.type());
         assertDevice(DID1, SW1, event.subject());
 
         assertEquals(1, deviceStore.getDeviceCount());
         assertEquals(0, deviceStore.getPorts(DID1).size());
+        verify(clusterCommunicator);
+        // TODO: verify broadcast message
+        assertTrue(bcast.hasCaptured());
 
         // putBack Device, Port w/o annotation
         putDevice(DID1, SW1);
@@ -563,34 +804,28 @@
             this.clusterService = clusterService;
             this.clusterCommunicator = clusterCommunicator;
         }
-    }
 
-    private static final class TestClusterCommunicationService implements ClusterCommunicationService {
-        @Override
-        public boolean broadcast(ClusterMessage message) throws IOException { return true; }
-        @Override
-        public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
-        @Override
-        public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
-        @Override
-        public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
+        public <T> T deserialize(byte[] bytes) {
+            return SERIALIZER.decode(bytes);
+        }
     }
 
     private static final class TestClusterService implements ClusterService {
 
-        private static final ControllerNode ONOS1 =
-            new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
         private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
         private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
 
         public TestClusterService() {
-            nodes.put(new NodeId("N1"), ONOS1);
-            nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
+            nodes.put(NID1, ONOS1);
+            nodeStates.put(NID1, ACTIVE);
+
+            nodes.put(NID2, ONOS2);
+            nodeStates.put(NID2, ACTIVE);
         }
 
         @Override
         public ControllerNode getLocalNode() {
-            return ONOS1;
+            return GossipDeviceStoreTest.ONOS1;
         }
 
         @Override
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java
new file mode 100644
index 0000000..4bcc0a3
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableListSerializer.java
@@ -0,0 +1,49 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.util.KryoPool.FamilySerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+
+/**
+ * Creates {@link ImmutableList} serializer instance.
+ */
+public class ImmutableListSerializer extends FamilySerializer<ImmutableList<?>> {
+
+    /**
+     * Creates {@link ImmutableList} serializer instance.
+     */
+    public ImmutableListSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableList<?> object) {
+        output.writeInt(object.size());
+        for (Object e : object) {
+            kryo.writeClassAndObject(output, e);
+        }
+    }
+
+    @Override
+    public ImmutableList<?> read(Kryo kryo, Input input,
+            Class<ImmutableList<?>> type) {
+        final int size = input.readInt();
+        Builder<Object> builder = ImmutableList.builder();
+        for (int i = 0; i < size; ++i) {
+            builder.add(kryo.readClassAndObject(input));
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void registerFamilies(Kryo kryo) {
+        kryo.register(ImmutableList.of(1).getClass(), this);
+        kryo.register(ImmutableList.of(1, 2).getClass(), this);
+        // TODO register required ImmutableList variants
+    }
+
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index efecb6c..f81a984 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -31,6 +31,9 @@
 import org.onlab.packet.IpPrefix;
 import org.onlab.util.KryoPool;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
 public final class KryoPoolUtil {
 
     /**
@@ -47,12 +50,15 @@
      */
     public static final KryoPool API = KryoPool.newBuilder()
             .register(MISC)
+            .register(ImmutableMap.class, new ImmutableMapSerializer())
+            .register(ImmutableList.class, new ImmutableListSerializer())
             .register(
                     //
                     ArrayList.class,
                     Arrays.asList().getClass(),
                     HashMap.class,
                     //
+                    //
                     ControllerNode.State.class,
                     Device.Type.class,
                     DefaultAnnotations.class,