Adding multi-instance support for flow stats.

Change-Id: I428c5a7cb58f4f9773a125fc94fb368ed846cb0d
diff --git a/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java b/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java
index 0382833..540a945 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java
@@ -61,6 +61,22 @@
         this.durationNano = durationNano;
     }
 
+    // Constructor for serializer
+    private DefaultPortStatistics() {
+        this.deviceId = null;
+        this.port = 0;
+        this.packetsReceived = 0;
+        this.packetsSent = 0;
+        this.bytesReceived = 0;
+        this.bytesSent = 0;
+        this.packetsRxDropped = 0;
+        this.packetsTxDropped = 0;
+        this.packetsRxErrors = 0;
+        this.packetsTxErrors = 0;
+        this.durationSec = 0;
+        this.durationNano = 0;
+    }
+
     /**
      * Creates a builder for DefaultPortStatistics object.
      *
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 5ea27af..e705567 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -50,6 +50,7 @@
 import org.onosproject.net.OmsPort;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DefaultPortStatistics;
 import org.onosproject.net.device.DeviceClockService;
 import org.onosproject.net.device.DeviceDescription;
 import org.onosproject.net.device.DeviceEvent;
@@ -68,8 +69,16 @@
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.Timestamped;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.onosproject.store.service.WallclockClockManager;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -102,6 +111,7 @@
 import static org.onosproject.net.device.DeviceEvent.Type.*;
 import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -123,13 +133,15 @@
     // innerMap is used to lock a Device, thus instance should never be replaced.
     // collection of Description given from various providers
     private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
-                                deviceDescs = Maps.newConcurrentMap();
+            deviceDescs = Maps.newConcurrentMap();
 
     // cache of Device and Ports generated by compositing descriptions from providers
     private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
     private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
-    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, PortStatistics>>
-            devicePortStats = Maps.newConcurrentMap();
+
+    private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
+    private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
+            portStatsListener = new InternalPortStatsListener();
 
     // to be updated under Device lock
     private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
@@ -142,6 +154,9 @@
     protected DeviceClockService deviceClockService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -182,10 +197,8 @@
     private long initialDelaySec = 5;
     private long periodSec = 5;
 
-
     @Activate
     public void activate() {
-
         executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
 
         backgroundExecutor =
@@ -198,8 +211,8 @@
                 new InternalDeviceOfflineEventListener(),
                 executor);
         clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
-                new InternalRemoveRequestListener(),
-                executor);
+                                          new InternalRemoveRequestListener(),
+                                          executor);
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
         clusterCommunicator.addSubscriber(
@@ -217,8 +230,24 @@
 
         // start anti-entropy thread
         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
-                initialDelaySec, periodSec, TimeUnit.SECONDS);
+                                               initialDelaySec, periodSec, TimeUnit.SECONDS);
 
+        // Create a distributed map for port stats.
+        KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(DefaultPortStatistics.class)
+                .register(DeviceId.class)
+                .register(MultiValuedTimestamp.class)
+                .register(WallClockTimestamp.class);
+
+        devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
+                .withName("port-stats")
+                .withSerializer(deviceDataSerializer)
+                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+                .withClockService(new WallclockClockManager<>())
+                .withTombstonesDisabled()
+                .build();
+        devicePortStats.addListener(portStatsListener);
         log.info("Started");
     }
 
@@ -272,8 +301,8 @@
 
     @Override
     public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
-                                     DeviceId deviceId,
-                                     DeviceDescription deviceDescription) {
+                                                         DeviceId deviceId,
+                                                         DeviceDescription deviceDescription) {
         NodeId localNode = clusterService.getLocalNode().id();
         NodeId deviceNode = mastershipService.getMasterFor(deviceId);
 
@@ -294,7 +323,7 @@
 
             if (deviceEvent != null) {
                 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
-                        providerId, deviceId);
+                         providerId, deviceId);
                 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
             }
 
@@ -324,12 +353,12 @@
     }
 
     private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
-                                    DeviceId deviceId,
-                                    Timestamped<DeviceDescription> deltaDesc) {
+                                                     DeviceId deviceId,
+                                                     Timestamped<DeviceDescription> deltaDesc) {
 
         // Collection of DeviceDescriptions for a Device
         Map<ProviderId, DeviceDescriptions> device
-            = getOrCreateDeviceDescriptionsMap(deviceId);
+                = getOrCreateDeviceDescriptionsMap(deviceId);
 
         synchronized (device) {
             // locking per device
@@ -345,7 +374,7 @@
             final Device newDevice;
 
             if (deltaDesc == descs.getDeviceDesc() ||
-                deltaDesc.isNewer(descs.getDeviceDesc())) {
+                    deltaDesc.isNewer(descs.getDeviceDesc())) {
                 // on new device or valid update
                 descs.putDeviceDesc(deltaDesc);
                 newDevice = composeDevice(deviceId, device);
@@ -371,8 +400,8 @@
         // update composed device cache
         Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
         verify(oldDevice == null,
-                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
-                providerId, oldDevice, newDevice);
+               "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+               providerId, oldDevice, newDevice);
 
         if (!providerId.isAncillary()) {
             markOnline(newDevice.id(), timestamp);
@@ -401,8 +430,8 @@
             boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
             if (!replaced) {
                 verify(replaced,
-                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
-                        providerId, oldDevice, devices.get(newDevice.id())
+                       "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+                       providerId, oldDevice, devices.get(newDevice.id())
                         , newDevice);
             }
             if (!providerId.isAncillary()) {
@@ -424,7 +453,7 @@
         final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
         if (event != null) {
             log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
-                    deviceId, timestamp);
+                     deviceId, timestamp);
             notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
         }
         return event;
@@ -433,7 +462,7 @@
     private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
 
         Map<ProviderId, DeviceDescriptions> providerDescs
-            = getOrCreateDeviceDescriptionsMap(deviceId);
+                = getOrCreateDeviceDescriptionsMap(deviceId);
 
         // locking device
         synchronized (providerDescs) {
@@ -465,7 +494,7 @@
      * Marks the device as available if the given timestamp is not outdated,
      * compared to the time the device has been marked offline.
      *
-     * @param deviceId identifier of the device
+     * @param deviceId  identifier of the device
      * @param timestamp of the event triggering this change.
      * @return true if availability change request was accepted and changed the state
      */
@@ -475,7 +504,7 @@
         // the latest offline request Timestamp
         Timestamp offlineTimestamp = offline.get(deviceId);
         if (offlineTimestamp == null ||
-            offlineTimestamp.compareTo(timestamp) < 0) {
+                offlineTimestamp.compareTo(timestamp) < 0) {
 
             offline.remove(deviceId);
             return availableDevices.add(deviceId);
@@ -485,8 +514,8 @@
 
     @Override
     public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
-                                       DeviceId deviceId,
-                                       List<PortDescription> portDescriptions) {
+                                                      DeviceId deviceId,
+                                                      List<PortDescription> portDescriptions) {
 
         NodeId localNode = clusterService.getLocalNode().id();
         // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
@@ -544,7 +573,7 @@
 
             if (!deviceEvents.isEmpty()) {
                 log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
-                        providerId, deviceId);
+                         providerId, deviceId);
                 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
             }
 
@@ -572,8 +601,8 @@
     }
 
     private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
-                                DeviceId deviceId,
-                                Timestamped<List<PortDescription>> portDescriptions) {
+                                                  DeviceId deviceId,
+                                                  Timestamped<List<PortDescription>> portDescriptions) {
 
         Device device = devices.get(deviceId);
         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@@ -592,8 +621,8 @@
             DeviceDescriptions descs = descsMap.get(providerId);
             // every provider must provide DeviceDescription.
             checkArgument(descs != null,
-                    "Device description for Device ID %s from Provider %s was not found",
-                    deviceId, providerId);
+                          "Device description for Device ID %s from Provider %s was not found",
+                          deviceId, providerId);
 
             Map<PortNumber, Port> ports = getPortMap(deviceId);
 
@@ -611,11 +640,11 @@
 
                 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
                 if (existingPortDesc == null ||
-                    newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
+                        newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
                     // on new port or valid update
                     // update description
                     descs.putPortDesc(new Timestamped<>(portDescription,
-                                            portDescriptions.timestamp()));
+                                                        portDescriptions.timestamp()));
                     newPort = composePort(device, number, descsMap);
                 } else {
                     // outdated event, ignored.
@@ -680,7 +709,7 @@
     // exist, it creates and registers a new one.
     private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
         return createIfAbsentUnchecked(devicePorts, deviceId,
-                NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
+                                       NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
     }
 
     private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
@@ -702,7 +731,6 @@
     private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
             Map<ProviderId, DeviceDescriptions> device,
             ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
-
         synchronized (device) {
             DeviceDescriptions r = device.get(providerId);
             if (r == null) {
@@ -728,26 +756,25 @@
             return null;
         }
         final Timestamped<PortDescription> deltaDesc
-            = new Timestamped<>(portDescription, newTimestamp);
+                = new Timestamped<>(portDescription, newTimestamp);
         final DeviceEvent event;
         final Timestamped<PortDescription> mergedDesc;
         final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
         synchronized (device) {
             event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
             mergedDesc = device.get(providerId)
-                            .getPortDesc(portDescription.portNumber());
+                    .getPortDesc(portDescription.portNumber());
         }
         if (event != null) {
             log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
-                        providerId, deviceId);
+                     providerId, deviceId);
             notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
         }
         return event;
     }
 
     private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
-                Timestamped<PortDescription> deltaDesc) {
-
+                                                 Timestamped<PortDescription> deltaDesc) {
         Device device = devices.get(deviceId);
         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
 
@@ -764,8 +791,8 @@
             DeviceDescriptions descs = descsMap.get(providerId);
             // assuming all providers must to give DeviceDescription
             verify(descs != null,
-                    "Device description for Device ID %s from Provider %s was not found",
-                    deviceId, providerId);
+                   "Device description for Device ID %s from Provider %s was not found",
+                   deviceId, providerId);
 
             ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
             final PortNumber number = deltaDesc.value().portNumber();
@@ -774,7 +801,7 @@
 
             final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
             if (existingPortDesc == null ||
-                deltaDesc.isNewer(existingPortDesc)) {
+                    deltaDesc.isNewer(existingPortDesc)) {
                 // on new port or valid update
                 // update description
                 descs.putPortDesc(deltaDesc);
@@ -805,24 +832,21 @@
     @Override
     public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
                                             Collection<PortStatistics> portStats) {
-
-        ConcurrentMap<PortNumber, PortStatistics> statsMap = devicePortStats.get(deviceId);
+        Map<PortNumber, PortStatistics> statsMap = devicePortStats.get(deviceId);
         if (statsMap == null) {
-            statsMap = Maps.newConcurrentMap();
-            devicePortStats.put(deviceId, statsMap);
+            statsMap = Maps.newHashMap();
         }
 
-        for (PortStatistics stat: portStats) {
+        for (PortStatistics stat : portStats) {
             PortNumber portNumber = PortNumber.portNumber(stat.port());
             statsMap.put(portNumber, stat);
         }
-
-        return new DeviceEvent(PORT_STATS_UPDATED,  devices.get(deviceId), null);
+        devicePortStats.put(deviceId, statsMap);
+        return null; // new DeviceEvent(PORT_STATS_UPDATED,  devices.get(deviceId), null);
     }
 
     @Override
     public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
-
         Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
         if (portStats == null) {
             return Collections.emptyList();
@@ -865,7 +889,7 @@
 
         if (!myId.equals(master)) {
             log.debug("{} has control of {}, forwarding remove request",
-                     master, deviceId);
+                      master, deviceId);
 
             // TODO check unicast return value
             clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
@@ -874,7 +898,7 @@
              */
 
             // event will be triggered after master processes it.
-             return null;
+            return null;
         }
 
         // I have control..
@@ -883,7 +907,7 @@
         DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
         if (event != null) {
             log.debug("Notifying peers of a device removed topology event for deviceId: {}",
-                    deviceId);
+                      deviceId);
             notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
         }
         if (relinquishAtEnd) {
@@ -917,7 +941,7 @@
             markOfflineInternal(deviceId, timestamp);
             descs.clear();
             return device == null ? null :
-                new DeviceEvent(DEVICE_REMOVED, device, null);
+                    new DeviceEvent(DEVICE_REMOVED, device, null);
         }
     }
 
@@ -925,14 +949,14 @@
      * Checks if given timestamp is superseded by removal request
      * with more recent timestamp.
      *
-     * @param deviceId identifier of a device
+     * @param deviceId         identifier of a device
      * @param timestampToCheck timestamp of an event to check
      * @return true if device is already removed
      */
     private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
         Timestamp removalTimestamp = removalRequest.get(deviceId);
         if (removalTimestamp != null &&
-            removalTimestamp.compareTo(timestampToCheck) >= 0) {
+                removalTimestamp.compareTo(timestampToCheck) >= 0) {
             // removalRequest is more recent
             return true;
         }
@@ -942,12 +966,12 @@
     /**
      * Returns a Device, merging description given from multiple Providers.
      *
-     * @param deviceId device identifier
+     * @param deviceId      device identifier
      * @param providerDescs Collection of Descriptions from multiple providers
      * @return Device instance
      */
     private Device composeDevice(DeviceId deviceId,
-            Map<ProviderId, DeviceDescriptions> providerDescs) {
+                                 Map<ProviderId, DeviceDescriptions> providerDescs) {
 
         checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
 
@@ -978,21 +1002,21 @@
             annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
         }
 
-        return new DefaultDevice(primary, deviceId , type, manufacturer,
-                            hwVersion, swVersion, serialNumber,
-                            chassisId, annotations);
+        return new DefaultDevice(primary, deviceId, type, manufacturer,
+                                 hwVersion, swVersion, serialNumber,
+                                 chassisId, annotations);
     }
 
     /**
      * Returns a Port, merging description given from multiple Providers.
      *
-     * @param device device the port is on
-     * @param number port number
+     * @param device   device the port is on
+     * @param number   port number
      * @param descsMap Collection of Descriptions from multiple providers
      * @return Port instance
      */
     private Port composePort(Device device, PortNumber number,
-                Map<ProviderId, DeviceDescriptions> descsMap) {
+                             Map<ProviderId, DeviceDescriptions> descsMap) {
 
         ProviderId primary = pickPrimaryPID(descsMap);
         DeviceDescriptions primDescs = descsMap.get(primary);
@@ -1028,12 +1052,12 @@
                     case OMS:
                         OmsPortDescription omsPortDesc = (OmsPortDescription) otherPortDesc.value();
                         updated = new OmsPort(device, number, isEnabled, omsPortDesc.minFrequency(),
-                                omsPortDesc.maxFrequency(), omsPortDesc.grid(), annotations);
+                                              omsPortDesc.maxFrequency(), omsPortDesc.grid(), annotations);
                         break;
                     case OCH:
                         OchPortDescription ochPortDesc = (OchPortDescription) otherPortDesc.value();
                         updated = new OchPort(device, number, isEnabled, ochPortDesc.signalType(),
-                                ochPortDesc.isTunable(), ochPortDesc.lambda(), annotations);
+                                              ochPortDesc.isTunable(), ochPortDesc.lambda(), annotations);
                         break;
                     case ODUCLT:
                         OduCltPortDescription oduCltPortDesc = (OduCltPortDescription) otherPortDesc.value();
@@ -1073,7 +1097,7 @@
     }
 
     private DeviceDescriptions getPrimaryDescriptions(
-                            Map<ProviderId, DeviceDescriptions> providerDescs) {
+            Map<ProviderId, DeviceDescriptions> providerDescs) {
         ProviderId pid = pickPrimaryPID(providerDescs);
         return providerDescs.get(pid);
     }
@@ -1174,14 +1198,14 @@
                     final DeviceDescriptions descs = prov.getValue();
 
                     adDevices.put(new DeviceFragmentId(deviceId, provId),
-                            descs.getDeviceDesc().timestamp());
+                                  descs.getDeviceDesc().timestamp());
 
                     for (Entry<PortNumber, Timestamped<PortDescription>>
                             portDesc : descs.getPortDescs().entrySet()) {
 
                         final PortNumber number = portDesc.getKey();
                         adPorts.put(new PortFragmentId(deviceId, provId, number),
-                                portDesc.getValue().timestamp());
+                                    portDesc.getValue().timestamp());
                     }
                 }
             }
@@ -1192,7 +1216,7 @@
 
     /**
      * Responds to anti-entropy advertisement message.
-     * <P>
+     * <p/>
      * Notify sender about out-dated information using regular replication message.
      * Send back advertisement to sender if not in sync.
      *
@@ -1269,7 +1293,7 @@
                     // find latest and update
                     final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
                     if (localLatest == null ||
-                        providerLatest.compareTo(localLatest) > 0) {
+                            providerLatest.compareTo(localLatest) > 0) {
                         localLatest = providerLatest;
                     }
                 } // end local provider loop
@@ -1277,7 +1301,7 @@
                 // checking if remote timestamp is more recent.
                 Timestamp rOffline = offlineAds.get(deviceId);
                 if (rOffline != null &&
-                    rOffline.compareTo(localLatest) > 0) {
+                        rOffline.compareTo(localLatest) > 0) {
                     // remote offline timestamp suggests that the
                     // device is off-line
                     markOfflineInternal(deviceId, rOffline);
@@ -1386,7 +1410,6 @@
             implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
-
             log.debug("Received device update event from peer: {}", message.sender());
             InternalDeviceEvent event = SERIALIZER.decode(message.payload());
 
@@ -1406,7 +1429,6 @@
             implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
-
             log.debug("Received device offline event from peer: {}", message.sender());
             InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
 
@@ -1440,7 +1462,6 @@
             implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
-
             log.debug("Received device removed event from peer: {}", message.sender());
             InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
 
@@ -1508,8 +1529,7 @@
     }
 
     private final class InternalDeviceAdvertisementListener
-        implements ClusterMessageHandler {
-
+            implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
             log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
@@ -1526,7 +1546,6 @@
             implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
-
             log.debug("Received injected device event from peer: {}", message.sender());
             DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
 
@@ -1551,7 +1570,6 @@
             implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
-
             log.debug("Received injected port event from peer: {}", message.sender());
             PortInjectedEvent event = SERIALIZER.decode(message.payload());
 
@@ -1571,4 +1589,17 @@
             }
         }
     }
+
+    private class InternalPortStatsListener
+            implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
+            if (event.type() == PUT) {
+                Device device = devices.get(event.key());
+                if (device != null) {
+                    delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
+                }
+            }
+        }
+    }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index dc08e76..5d4a91f 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -55,6 +55,7 @@
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.consistent.impl.DatabaseManager;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -157,7 +158,7 @@
 
         clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
         clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
-                                    anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
+                                          anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
         expectLastCall().anyTimes();
         replay(clusterCommunicator);
         ClusterService clusterService = new TestClusterService();
@@ -165,6 +166,10 @@
         testGossipDeviceStore = new TestGossipDeviceStore(deviceClockService, clusterService, clusterCommunicator);
         testGossipDeviceStore.mastershipService = new TestMastershipService();
 
+        TestDatabaseManager testDatabaseManager = new TestDatabaseManager();
+        testDatabaseManager.init(clusterService, clusterCommunicator);
+        testGossipDeviceStore.storageService = testDatabaseManager;
+
         gossipDeviceStore = testGossipDeviceStore;
         gossipDeviceStore.activate();
         deviceStore = gossipDeviceStore;
@@ -885,4 +890,12 @@
             nodeStates.put(NID2, ACTIVE);
         }
     }
+
+    private class TestDatabaseManager extends DatabaseManager {
+        void init(ClusterService clusterService,
+                  ClusterCommunicationService clusterCommunicator) {
+            this.clusterService = clusterService;
+            this.clusterCommunicator = clusterCommunicator;
+        }
+    }
 }
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 9356a29..b3e7a75 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -71,9 +71,11 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DefaultDeviceDescription;
 import org.onosproject.net.device.DefaultPortDescription;
+import org.onosproject.net.device.DefaultPortStatistics;
 import org.onosproject.net.device.OchPortDescription;
 import org.onosproject.net.device.OduCltPortDescription;
 import org.onosproject.net.device.OmsPortDescription;
+import org.onosproject.net.device.PortStatistics;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.DefaultFlowRule;
@@ -380,7 +382,9 @@
                     IntentOperation.class,
                     FlowRuleExtPayLoad.class,
                     Frequency.class,
-                    DefaultAnnotations.class
+                    DefaultAnnotations.class,
+                    PortStatistics.class,
+                    DefaultPortStatistics.class
             )
             .register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class)
             .register(new URISerializer(), URI.class)
diff --git a/tools/test/bin/onos-check-apps b/tools/test/bin/onos-check-apps
index 03df7dc..9760a41 100755
--- a/tools/test/bin/onos-check-apps
+++ b/tools/test/bin/onos-check-apps
@@ -14,7 +14,7 @@
 
 # Normalize the expected apps
 apps=${ONOS_APPS:-drivers,openflow}
-(for app in ${apps/,/ }; do echo org.onosproject.$app; done) | sort > $aux.2
+(for app in ${apps//,/ }; do echo org.onosproject.$app; done) | sort > $aux.2
 
 # Check for differences
 diff $aux.1 $aux.2
diff --git a/tools/test/cells/madan3 b/tools/test/cells/madan3
new file mode 100644
index 0000000..857d096
--- /dev/null
+++ b/tools/test/cells/madan3
@@ -0,0 +1,7 @@
+# Madan's ProxMox ONOS instances 1,2,3 & ONOS mininet box
+
+export ONOS_NIC="10.128.4.*"
+export OC1="10.128.4.2"
+export OC2="10.128.4.3"
+export OC3="10.128.4.4"
+export OCN="10.128.4.5"
diff --git a/tools/test/cells/tomx b/tools/test/cells/tomx
index 48018de..b055449 100644
--- a/tools/test/cells/tomx
+++ b/tools/test/cells/tomx
@@ -1,6 +1,6 @@
-# Office ProxMox ONOS instances 1,2,3 & ONOS mininet box
+# Tom's ProxMox ONOS instances 1,2,3 & ONOS mininet box
 
-export ONOS_NIC=10.128.11.*
+export ONOS_NIC="10.128.11.*"
 export OC1="10.128.11.1"
 export OC2="10.128.11.2"
 export OC3="10.128.11.3"
diff --git a/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java b/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java
index 71eb1fc..c19e8bc 100644
--- a/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java
+++ b/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java
@@ -850,7 +850,7 @@
             if (load != null) {
                 this.hasTraffic = hasTraffic || load.rate() > threshold;
                 this.bytes += load.latest();
-                this.rate = load.rate();
+                this.rate += load.rate();
             }
         }