Adding multi-instance support for flow stats.
Change-Id: I428c5a7cb58f4f9773a125fc94fb368ed846cb0d
diff --git a/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java b/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java
index 0382833..540a945 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java
@@ -61,6 +61,22 @@
this.durationNano = durationNano;
}
+ // Constructor for serializer
+ private DefaultPortStatistics() {
+ this.deviceId = null;
+ this.port = 0;
+ this.packetsReceived = 0;
+ this.packetsSent = 0;
+ this.bytesReceived = 0;
+ this.bytesSent = 0;
+ this.packetsRxDropped = 0;
+ this.packetsTxDropped = 0;
+ this.packetsRxErrors = 0;
+ this.packetsTxErrors = 0;
+ this.durationSec = 0;
+ this.durationNano = 0;
+ }
+
/**
* Creates a builder for DefaultPortStatistics object.
*
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 5ea27af..e705567 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -50,6 +50,7 @@
import org.onosproject.net.OmsPort;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DefaultPortStatistics;
import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceEvent;
@@ -68,8 +69,16 @@
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.Timestamped;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.onosproject.store.service.WallclockClockManager;
import org.slf4j.Logger;
import java.io.IOException;
@@ -102,6 +111,7 @@
import static org.onosproject.net.device.DeviceEvent.Type.*;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -123,13 +133,15 @@
// innerMap is used to lock a Device, thus instance should never be replaced.
// collection of Description given from various providers
private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
- deviceDescs = Maps.newConcurrentMap();
+ deviceDescs = Maps.newConcurrentMap();
// cache of Device and Ports generated by compositing descriptions from providers
private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
- private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, PortStatistics>>
- devicePortStats = Maps.newConcurrentMap();
+
+ private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
+ private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
+ portStatsListener = new InternalPortStatsListener();
// to be updated under Device lock
private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
@@ -142,6 +154,9 @@
protected DeviceClockService deviceClockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -182,10 +197,8 @@
private long initialDelaySec = 5;
private long periodSec = 5;
-
@Activate
public void activate() {
-
executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
backgroundExecutor =
@@ -198,8 +211,8 @@
new InternalDeviceOfflineEventListener(),
executor);
clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
- new InternalRemoveRequestListener(),
- executor);
+ new InternalRemoveRequestListener(),
+ executor);
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
clusterCommunicator.addSubscriber(
@@ -217,8 +230,24 @@
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
- initialDelaySec, periodSec, TimeUnit.SECONDS);
+ initialDelaySec, periodSec, TimeUnit.SECONDS);
+ // Create a distributed map for port stats.
+ KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(DefaultPortStatistics.class)
+ .register(DeviceId.class)
+ .register(MultiValuedTimestamp.class)
+ .register(WallClockTimestamp.class);
+
+ devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
+ .withName("port-stats")
+ .withSerializer(deviceDataSerializer)
+ .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+ .withClockService(new WallclockClockManager<>())
+ .withTombstonesDisabled()
+ .build();
+ devicePortStats.addListener(portStatsListener);
log.info("Started");
}
@@ -272,8 +301,8 @@
@Override
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
- DeviceId deviceId,
- DeviceDescription deviceDescription) {
+ DeviceId deviceId,
+ DeviceDescription deviceDescription) {
NodeId localNode = clusterService.getLocalNode().id();
NodeId deviceNode = mastershipService.getMasterFor(deviceId);
@@ -294,7 +323,7 @@
if (deviceEvent != null) {
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
- providerId, deviceId);
+ providerId, deviceId);
notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
}
@@ -324,12 +353,12 @@
}
private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
- DeviceId deviceId,
- Timestamped<DeviceDescription> deltaDesc) {
+ DeviceId deviceId,
+ Timestamped<DeviceDescription> deltaDesc) {
// Collection of DeviceDescriptions for a Device
Map<ProviderId, DeviceDescriptions> device
- = getOrCreateDeviceDescriptionsMap(deviceId);
+ = getOrCreateDeviceDescriptionsMap(deviceId);
synchronized (device) {
// locking per device
@@ -345,7 +374,7 @@
final Device newDevice;
if (deltaDesc == descs.getDeviceDesc() ||
- deltaDesc.isNewer(descs.getDeviceDesc())) {
+ deltaDesc.isNewer(descs.getDeviceDesc())) {
// on new device or valid update
descs.putDeviceDesc(deltaDesc);
newDevice = composeDevice(deviceId, device);
@@ -371,8 +400,8 @@
// update composed device cache
Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
verify(oldDevice == null,
- "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
- providerId, oldDevice, newDevice);
+ "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+ providerId, oldDevice, newDevice);
if (!providerId.isAncillary()) {
markOnline(newDevice.id(), timestamp);
@@ -401,8 +430,8 @@
boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
if (!replaced) {
verify(replaced,
- "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
- providerId, oldDevice, devices.get(newDevice.id())
+ "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+ providerId, oldDevice, devices.get(newDevice.id())
, newDevice);
}
if (!providerId.isAncillary()) {
@@ -424,7 +453,7 @@
final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
if (event != null) {
log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
- deviceId, timestamp);
+ deviceId, timestamp);
notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
}
return event;
@@ -433,7 +462,7 @@
private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
Map<ProviderId, DeviceDescriptions> providerDescs
- = getOrCreateDeviceDescriptionsMap(deviceId);
+ = getOrCreateDeviceDescriptionsMap(deviceId);
// locking device
synchronized (providerDescs) {
@@ -465,7 +494,7 @@
* Marks the device as available if the given timestamp is not outdated,
* compared to the time the device has been marked offline.
*
- * @param deviceId identifier of the device
+ * @param deviceId identifier of the device
* @param timestamp of the event triggering this change.
* @return true if availability change request was accepted and changed the state
*/
@@ -475,7 +504,7 @@
// the latest offline request Timestamp
Timestamp offlineTimestamp = offline.get(deviceId);
if (offlineTimestamp == null ||
- offlineTimestamp.compareTo(timestamp) < 0) {
+ offlineTimestamp.compareTo(timestamp) < 0) {
offline.remove(deviceId);
return availableDevices.add(deviceId);
@@ -485,8 +514,8 @@
@Override
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
- DeviceId deviceId,
- List<PortDescription> portDescriptions) {
+ DeviceId deviceId,
+ List<PortDescription> portDescriptions) {
NodeId localNode = clusterService.getLocalNode().id();
// TODO: It might be negligible, but this will have negative impact to topology discovery performance,
@@ -544,7 +573,7 @@
if (!deviceEvents.isEmpty()) {
log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
- providerId, deviceId);
+ providerId, deviceId);
notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
}
@@ -572,8 +601,8 @@
}
private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
- DeviceId deviceId,
- Timestamped<List<PortDescription>> portDescriptions) {
+ DeviceId deviceId,
+ Timestamped<List<PortDescription>> portDescriptions) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@@ -592,8 +621,8 @@
DeviceDescriptions descs = descsMap.get(providerId);
// every provider must provide DeviceDescription.
checkArgument(descs != null,
- "Device description for Device ID %s from Provider %s was not found",
- deviceId, providerId);
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
Map<PortNumber, Port> ports = getPortMap(deviceId);
@@ -611,11 +640,11 @@
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null ||
- newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
+ newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
// on new port or valid update
// update description
descs.putPortDesc(new Timestamped<>(portDescription,
- portDescriptions.timestamp()));
+ portDescriptions.timestamp()));
newPort = composePort(device, number, descsMap);
} else {
// outdated event, ignored.
@@ -680,7 +709,7 @@
// exist, it creates and registers a new one.
private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
return createIfAbsentUnchecked(devicePorts, deviceId,
- NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
+ NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
}
private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
@@ -702,7 +731,6 @@
private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
Map<ProviderId, DeviceDescriptions> device,
ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
-
synchronized (device) {
DeviceDescriptions r = device.get(providerId);
if (r == null) {
@@ -728,26 +756,25 @@
return null;
}
final Timestamped<PortDescription> deltaDesc
- = new Timestamped<>(portDescription, newTimestamp);
+ = new Timestamped<>(portDescription, newTimestamp);
final DeviceEvent event;
final Timestamped<PortDescription> mergedDesc;
final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
synchronized (device) {
event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
mergedDesc = device.get(providerId)
- .getPortDesc(portDescription.portNumber());
+ .getPortDesc(portDescription.portNumber());
}
if (event != null) {
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
- providerId, deviceId);
+ providerId, deviceId);
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
}
return event;
}
private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
- Timestamped<PortDescription> deltaDesc) {
-
+ Timestamped<PortDescription> deltaDesc) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@@ -764,8 +791,8 @@
DeviceDescriptions descs = descsMap.get(providerId);
// assuming all providers must to give DeviceDescription
verify(descs != null,
- "Device description for Device ID %s from Provider %s was not found",
- deviceId, providerId);
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
final PortNumber number = deltaDesc.value().portNumber();
@@ -774,7 +801,7 @@
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null ||
- deltaDesc.isNewer(existingPortDesc)) {
+ deltaDesc.isNewer(existingPortDesc)) {
// on new port or valid update
// update description
descs.putPortDesc(deltaDesc);
@@ -805,24 +832,21 @@
@Override
public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
Collection<PortStatistics> portStats) {
-
- ConcurrentMap<PortNumber, PortStatistics> statsMap = devicePortStats.get(deviceId);
+ Map<PortNumber, PortStatistics> statsMap = devicePortStats.get(deviceId);
if (statsMap == null) {
- statsMap = Maps.newConcurrentMap();
- devicePortStats.put(deviceId, statsMap);
+ statsMap = Maps.newHashMap();
}
- for (PortStatistics stat: portStats) {
+ for (PortStatistics stat : portStats) {
PortNumber portNumber = PortNumber.portNumber(stat.port());
statsMap.put(portNumber, stat);
}
-
- return new DeviceEvent(PORT_STATS_UPDATED, devices.get(deviceId), null);
+ devicePortStats.put(deviceId, statsMap);
+ return null; // new DeviceEvent(PORT_STATS_UPDATED, devices.get(deviceId), null);
}
@Override
public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
-
Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
if (portStats == null) {
return Collections.emptyList();
@@ -865,7 +889,7 @@
if (!myId.equals(master)) {
log.debug("{} has control of {}, forwarding remove request",
- master, deviceId);
+ master, deviceId);
// TODO check unicast return value
clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
@@ -874,7 +898,7 @@
*/
// event will be triggered after master processes it.
- return null;
+ return null;
}
// I have control..
@@ -883,7 +907,7 @@
DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
if (event != null) {
log.debug("Notifying peers of a device removed topology event for deviceId: {}",
- deviceId);
+ deviceId);
notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
}
if (relinquishAtEnd) {
@@ -917,7 +941,7 @@
markOfflineInternal(deviceId, timestamp);
descs.clear();
return device == null ? null :
- new DeviceEvent(DEVICE_REMOVED, device, null);
+ new DeviceEvent(DEVICE_REMOVED, device, null);
}
}
@@ -925,14 +949,14 @@
* Checks if given timestamp is superseded by removal request
* with more recent timestamp.
*
- * @param deviceId identifier of a device
+ * @param deviceId identifier of a device
* @param timestampToCheck timestamp of an event to check
* @return true if device is already removed
*/
private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
Timestamp removalTimestamp = removalRequest.get(deviceId);
if (removalTimestamp != null &&
- removalTimestamp.compareTo(timestampToCheck) >= 0) {
+ removalTimestamp.compareTo(timestampToCheck) >= 0) {
// removalRequest is more recent
return true;
}
@@ -942,12 +966,12 @@
/**
* Returns a Device, merging description given from multiple Providers.
*
- * @param deviceId device identifier
+ * @param deviceId device identifier
* @param providerDescs Collection of Descriptions from multiple providers
* @return Device instance
*/
private Device composeDevice(DeviceId deviceId,
- Map<ProviderId, DeviceDescriptions> providerDescs) {
+ Map<ProviderId, DeviceDescriptions> providerDescs) {
checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
@@ -978,21 +1002,21 @@
annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
}
- return new DefaultDevice(primary, deviceId , type, manufacturer,
- hwVersion, swVersion, serialNumber,
- chassisId, annotations);
+ return new DefaultDevice(primary, deviceId, type, manufacturer,
+ hwVersion, swVersion, serialNumber,
+ chassisId, annotations);
}
/**
* Returns a Port, merging description given from multiple Providers.
*
- * @param device device the port is on
- * @param number port number
+ * @param device device the port is on
+ * @param number port number
* @param descsMap Collection of Descriptions from multiple providers
* @return Port instance
*/
private Port composePort(Device device, PortNumber number,
- Map<ProviderId, DeviceDescriptions> descsMap) {
+ Map<ProviderId, DeviceDescriptions> descsMap) {
ProviderId primary = pickPrimaryPID(descsMap);
DeviceDescriptions primDescs = descsMap.get(primary);
@@ -1028,12 +1052,12 @@
case OMS:
OmsPortDescription omsPortDesc = (OmsPortDescription) otherPortDesc.value();
updated = new OmsPort(device, number, isEnabled, omsPortDesc.minFrequency(),
- omsPortDesc.maxFrequency(), omsPortDesc.grid(), annotations);
+ omsPortDesc.maxFrequency(), omsPortDesc.grid(), annotations);
break;
case OCH:
OchPortDescription ochPortDesc = (OchPortDescription) otherPortDesc.value();
updated = new OchPort(device, number, isEnabled, ochPortDesc.signalType(),
- ochPortDesc.isTunable(), ochPortDesc.lambda(), annotations);
+ ochPortDesc.isTunable(), ochPortDesc.lambda(), annotations);
break;
case ODUCLT:
OduCltPortDescription oduCltPortDesc = (OduCltPortDescription) otherPortDesc.value();
@@ -1073,7 +1097,7 @@
}
private DeviceDescriptions getPrimaryDescriptions(
- Map<ProviderId, DeviceDescriptions> providerDescs) {
+ Map<ProviderId, DeviceDescriptions> providerDescs) {
ProviderId pid = pickPrimaryPID(providerDescs);
return providerDescs.get(pid);
}
@@ -1174,14 +1198,14 @@
final DeviceDescriptions descs = prov.getValue();
adDevices.put(new DeviceFragmentId(deviceId, provId),
- descs.getDeviceDesc().timestamp());
+ descs.getDeviceDesc().timestamp());
for (Entry<PortNumber, Timestamped<PortDescription>>
portDesc : descs.getPortDescs().entrySet()) {
final PortNumber number = portDesc.getKey();
adPorts.put(new PortFragmentId(deviceId, provId, number),
- portDesc.getValue().timestamp());
+ portDesc.getValue().timestamp());
}
}
}
@@ -1192,7 +1216,7 @@
/**
* Responds to anti-entropy advertisement message.
- * <P>
+ * <p/>
* Notify sender about out-dated information using regular replication message.
* Send back advertisement to sender if not in sync.
*
@@ -1269,7 +1293,7 @@
// find latest and update
final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
if (localLatest == null ||
- providerLatest.compareTo(localLatest) > 0) {
+ providerLatest.compareTo(localLatest) > 0) {
localLatest = providerLatest;
}
} // end local provider loop
@@ -1277,7 +1301,7 @@
// checking if remote timestamp is more recent.
Timestamp rOffline = offlineAds.get(deviceId);
if (rOffline != null &&
- rOffline.compareTo(localLatest) > 0) {
+ rOffline.compareTo(localLatest) > 0) {
// remote offline timestamp suggests that the
// device is off-line
markOfflineInternal(deviceId, rOffline);
@@ -1386,7 +1410,6 @@
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
-
log.debug("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = SERIALIZER.decode(message.payload());
@@ -1406,7 +1429,6 @@
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
-
log.debug("Received device offline event from peer: {}", message.sender());
InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
@@ -1440,7 +1462,6 @@
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
-
log.debug("Received device removed event from peer: {}", message.sender());
InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
@@ -1508,8 +1529,7 @@
}
private final class InternalDeviceAdvertisementListener
- implements ClusterMessageHandler {
-
+ implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
@@ -1526,7 +1546,6 @@
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
-
log.debug("Received injected device event from peer: {}", message.sender());
DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
@@ -1551,7 +1570,6 @@
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
-
log.debug("Received injected port event from peer: {}", message.sender());
PortInjectedEvent event = SERIALIZER.decode(message.payload());
@@ -1571,4 +1589,17 @@
}
}
}
+
+ private class InternalPortStatsListener
+ implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
+ @Override
+ public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
+ if (event.type() == PUT) {
+ Device device = devices.get(event.key());
+ if (device != null) {
+ delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
+ }
+ }
+ }
+ }
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index dc08e76..5d4a91f 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -55,6 +55,7 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.consistent.impl.DatabaseManager;
import java.io.IOException;
import java.util.Arrays;
@@ -157,7 +158,7 @@
clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
- anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
+ anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
expectLastCall().anyTimes();
replay(clusterCommunicator);
ClusterService clusterService = new TestClusterService();
@@ -165,6 +166,10 @@
testGossipDeviceStore = new TestGossipDeviceStore(deviceClockService, clusterService, clusterCommunicator);
testGossipDeviceStore.mastershipService = new TestMastershipService();
+ TestDatabaseManager testDatabaseManager = new TestDatabaseManager();
+ testDatabaseManager.init(clusterService, clusterCommunicator);
+ testGossipDeviceStore.storageService = testDatabaseManager;
+
gossipDeviceStore = testGossipDeviceStore;
gossipDeviceStore.activate();
deviceStore = gossipDeviceStore;
@@ -885,4 +890,12 @@
nodeStates.put(NID2, ACTIVE);
}
}
+
+ private class TestDatabaseManager extends DatabaseManager {
+ void init(ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator) {
+ this.clusterService = clusterService;
+ this.clusterCommunicator = clusterCommunicator;
+ }
+ }
}
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 9356a29..b3e7a75 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -71,9 +71,11 @@
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
+import org.onosproject.net.device.DefaultPortStatistics;
import org.onosproject.net.device.OchPortDescription;
import org.onosproject.net.device.OduCltPortDescription;
import org.onosproject.net.device.OmsPortDescription;
+import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.DefaultFlowRule;
@@ -380,7 +382,9 @@
IntentOperation.class,
FlowRuleExtPayLoad.class,
Frequency.class,
- DefaultAnnotations.class
+ DefaultAnnotations.class,
+ PortStatistics.class,
+ DefaultPortStatistics.class
)
.register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class)
.register(new URISerializer(), URI.class)
diff --git a/tools/test/bin/onos-check-apps b/tools/test/bin/onos-check-apps
index 03df7dc..9760a41 100755
--- a/tools/test/bin/onos-check-apps
+++ b/tools/test/bin/onos-check-apps
@@ -14,7 +14,7 @@
# Normalize the expected apps
apps=${ONOS_APPS:-drivers,openflow}
-(for app in ${apps/,/ }; do echo org.onosproject.$app; done) | sort > $aux.2
+(for app in ${apps//,/ }; do echo org.onosproject.$app; done) | sort > $aux.2
# Check for differences
diff $aux.1 $aux.2
diff --git a/tools/test/cells/madan3 b/tools/test/cells/madan3
new file mode 100644
index 0000000..857d096
--- /dev/null
+++ b/tools/test/cells/madan3
@@ -0,0 +1,7 @@
+# Madan's ProxMox ONOS instances 1,2,3 & ONOS mininet box
+
+export ONOS_NIC="10.128.4.*"
+export OC1="10.128.4.2"
+export OC2="10.128.4.3"
+export OC3="10.128.4.4"
+export OCN="10.128.4.5"
diff --git a/tools/test/cells/tomx b/tools/test/cells/tomx
index 48018de..b055449 100644
--- a/tools/test/cells/tomx
+++ b/tools/test/cells/tomx
@@ -1,6 +1,6 @@
-# Office ProxMox ONOS instances 1,2,3 & ONOS mininet box
+# Tom's ProxMox ONOS instances 1,2,3 & ONOS mininet box
-export ONOS_NIC=10.128.11.*
+export ONOS_NIC="10.128.11.*"
export OC1="10.128.11.1"
export OC2="10.128.11.2"
export OC3="10.128.11.3"
diff --git a/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java b/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java
index 71eb1fc..c19e8bc 100644
--- a/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java
+++ b/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java
@@ -850,7 +850,7 @@
if (load != null) {
this.hasTraffic = hasTraffic || load.rate() > threshold;
this.bytes += load.latest();
- this.rate = load.rate();
+ this.rate += load.rate();
}
}