DeviceStore update
- Add off-line/remove handling to Gossip~
- Backport lock scope to Simple~
Change-Id: I5b4c8e12738ef78920341fb8699c4b07bde8712a
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
index 0880ac9..514a22e 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
@@ -2,6 +2,8 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
@@ -32,7 +34,6 @@
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -48,6 +49,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
+import static com.google.common.base.Verify.verify;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
@@ -71,14 +73,14 @@
// collection of Description given from various providers
private final ConcurrentMap<DeviceId,
ConcurrentMap<ProviderId, DeviceDescriptions>>
- deviceDescs = new ConcurrentHashMap<>();
+ deviceDescs = Maps.newConcurrentMap();
// cache of Device and Ports generated by compositing descriptions from providers
- private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
- private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
+ private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
// available(=UP) devices
- private final Set<DeviceId> availableDevices = new HashSet<>();
+ private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
@Activate
@@ -88,6 +90,10 @@
@Deactivate
public void deactivate() {
+ deviceDescs.clear();
+ devices.clear();
+ devicePorts.clear();
+ availableDevices.clear();
log.info("Stopped");
}
@@ -107,45 +113,54 @@
}
@Override
- public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+ public DeviceEvent createOrUpdateDevice(ProviderId providerId,
+ DeviceId deviceId,
DeviceDescription deviceDescription) {
+
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= getDeviceDescriptions(deviceId);
- Device oldDevice = devices.get(deviceId);
+ synchronized (providerDescs) {
+ // locking per device
- DeviceDescriptions descs
- = createIfAbsentUnchecked(providerDescs, providerId,
- new InitDeviceDescs(deviceDescription));
+ DeviceDescriptions descs
+ = createIfAbsentUnchecked(providerDescs, providerId,
+ new InitDeviceDescs(deviceDescription));
- // update description
- descs.putDeviceDesc(deviceDescription);
- Device newDevice = composeDevice(deviceId, providerDescs);
+ Device oldDevice = devices.get(deviceId);
+ // update description
+ descs.putDeviceDesc(deviceDescription);
+ Device newDevice = composeDevice(deviceId, providerDescs);
- if (oldDevice == null) {
- // ADD
- return createDevice(providerId, newDevice);
- } else {
- // UPDATE or ignore (no change or stale)
- return updateDevice(providerId, oldDevice, newDevice);
+ if (oldDevice == null) {
+ // ADD
+ return createDevice(providerId, newDevice);
+ } else {
+ // UPDATE or ignore (no change or stale)
+ return updateDevice(providerId, oldDevice, newDevice);
+ }
}
}
// Creates the device and returns the appropriate event if necessary.
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent createDevice(ProviderId providerId, Device newDevice) {
// update composed device cache
- synchronized (this) {
- devices.putIfAbsent(newDevice.id(), newDevice);
- if (!providerId.isAncillary()) {
- availableDevices.add(newDevice.id());
- }
+ Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
+ verify(oldDevice == null,
+ "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+ providerId, oldDevice, newDevice);
+
+ if (!providerId.isAncillary()) {
+ availableDevices.add(newDevice.id());
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
}
// Updates the device and returns the appropriate event if necessary.
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) {
// We allow only certain attributes to trigger update
@@ -153,70 +168,87 @@
!Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
!AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
- synchronized (this) {
- devices.replace(newDevice.id(), oldDevice, newDevice);
- if (!providerId.isAncillary()) {
- availableDevices.add(newDevice.id());
- }
+ boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
+ if (!replaced) {
+ verify(replaced,
+ "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+ providerId, oldDevice, devices.get(newDevice.id())
+ , newDevice);
+ }
+ if (!providerId.isAncillary()) {
+ availableDevices.add(newDevice.id());
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
}
// Otherwise merely attempt to change availability if primary provider
if (!providerId.isAncillary()) {
- synchronized (this) {
boolean added = availableDevices.add(newDevice.id());
return !added ? null :
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
- }
}
return null;
}
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
- synchronized (this) {
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+ = getDeviceDescriptions(deviceId);
+
+ // locking device
+ synchronized (providerDescs) {
Device device = devices.get(deviceId);
- boolean removed = (device != null) && availableDevices.remove(deviceId);
- return !removed ? null :
- new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+ if (device == null) {
+ return null;
+ }
+ boolean removed = availableDevices.remove(deviceId);
+ if (removed) {
+ // TODO: broadcast ... DOWN only?
+ return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+ }
+ return null;
}
}
@Override
- public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
- List<PortDescription> portDescriptions) {
+ public List<DeviceEvent> updatePorts(ProviderId providerId,
+ DeviceId deviceId,
+ List<PortDescription> portDescriptions) {
- // TODO: implement multi-provider
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
- DeviceDescriptions descs = descsMap.get(providerId);
- checkArgument(descs != null,
- "Device description for Device ID %s from Provider %s was not found",
- deviceId, providerId);
-
-
List<DeviceEvent> events = new ArrayList<>();
- synchronized (this) {
- ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+ synchronized (descsMap) {
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // every provider must provide DeviceDescription.
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
+ Map<PortNumber, Port> ports = getPortMap(deviceId);
// Add new ports
Set<PortNumber> processed = new HashSet<>();
for (PortDescription portDescription : portDescriptions) {
- PortNumber number = portDescription.portNumber();
- Port oldPort = ports.get(number);
+ final PortNumber number = portDescription.portNumber();
+ processed.add(portDescription.portNumber());
+
+ final Port oldPort = ports.get(number);
+ final Port newPort;
+
+// event suppression hook?
+
// update description
descs.putPortDesc(portDescription);
- Port newPort = composePort(device, number, descsMap);
+ newPort = composePort(device, number, descsMap);
events.add(oldPort == null ?
- createPort(device, newPort, ports) :
- updatePort(device, oldPort, newPort, ports));
- processed.add(portDescription.portNumber());
+ createPort(device, newPort, ports) :
+ updatePort(device, oldPort, newPort, ports));
}
events.addAll(pruneOldPorts(device, ports, processed));
@@ -226,17 +258,19 @@
// Creates a new port based on the port description adds it to the map and
// Returns corresponding event.
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent createPort(Device device, Port newPort,
- ConcurrentMap<PortNumber, Port> ports) {
+ Map<PortNumber, Port> ports) {
ports.put(newPort.number(), newPort);
return new DeviceEvent(PORT_ADDED, device, newPort);
}
// Checks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event.
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent updatePort(Device device, Port oldPort,
Port newPort,
- ConcurrentMap<PortNumber, Port> ports) {
+ Map<PortNumber, Port> ports) {
if (oldPort.isEnabled() != newPort.isEnabled() ||
!AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
@@ -248,6 +282,7 @@
// Prunes the specified list of ports based on which ports are in the
// processed list and returns list of corresponding events.
+ // Guarded by deviceDescs value (=Device lock)
private List<DeviceEvent> pruneOldPorts(Device device,
Map<PortNumber, Port> ports,
Set<PortNumber> processed) {
@@ -264,12 +299,6 @@
return events;
}
- private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
- DeviceId deviceId) {
- return createIfAbsentUnchecked(deviceDescs, deviceId,
- NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
- }
-
// Gets the map of ports for the specified device; if one does not already
// exist, it creates and registers a new one.
private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
@@ -277,8 +306,14 @@
NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
}
+ private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
+ DeviceId deviceId) {
+ return createIfAbsentUnchecked(deviceDescs, deviceId,
+ NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
+ }
+
@Override
- public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+ public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@@ -286,19 +321,22 @@
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
- DeviceDescriptions descs = descsMap.get(providerId);
- // assuming all providers must to give DeviceDescription
- checkArgument(descs != null,
- "Device description for Device ID %s from Provider %s was not found",
- deviceId, providerId);
+ synchronized (descsMap) {
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // assuming all providers must to give DeviceDescription
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
- synchronized (this) {
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
final PortNumber number = portDescription.portNumber();
- Port oldPort = ports.get(number);
+ final Port oldPort = ports.get(number);
+ final Port newPort;
+
// update description
descs.putPortDesc(portDescription);
- Port newPort = composePort(device, number, descsMap);
+ newPort = composePort(device, number, descsMap);
+
if (oldPort == null) {
return createPort(device, newPort, ports);
} else {
@@ -333,7 +371,7 @@
synchronized (descs) {
Device device = devices.remove(deviceId);
// should DEVICE_REMOVED carry removed ports?
- ConcurrentMap<PortNumber, Port> ports = devicePorts.get(deviceId);
+ Map<PortNumber, Port> ports = devicePorts.get(deviceId);
if (ports != null) {
ports.clear();
}
@@ -360,14 +398,14 @@
DeviceDescriptions desc = providerDescs.get(primary);
- // base
- Type type = desc.getDeviceDesc().type();
- String manufacturer = desc.getDeviceDesc().manufacturer();
- String hwVersion = desc.getDeviceDesc().hwVersion();
- String swVersion = desc.getDeviceDesc().swVersion();
- String serialNumber = desc.getDeviceDesc().serialNumber();
+ final DeviceDescription base = desc.getDeviceDesc();
+ Type type = base.type();
+ String manufacturer = base.manufacturer();
+ String hwVersion = base.hwVersion();
+ String swVersion = base.swVersion();
+ String serialNumber = base.serialNumber();
DefaultAnnotations annotations = DefaultAnnotations.builder().build();
- annotations = merge(annotations, desc.getDeviceDesc().annotations());
+ annotations = merge(annotations, base.annotations());
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (e.getKey().equals(primary)) {
@@ -386,7 +424,14 @@
hwVersion, swVersion, serialNumber, annotations);
}
- // probably want composePort"s" also
+ /**
+ * Returns a Port, merging description given from multiple Providers.
+ *
+ * @param device device the port is on
+ * @param number port number
+ * @param providerDescs Collection of Descriptions from multiple providers
+ * @return Port instance
+ */
private Port composePort(Device device, PortNumber number,
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
@@ -441,7 +486,9 @@
public static final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> {
+
private final DeviceDescription deviceDesc;
+
public InitDeviceDescs(DeviceDescription deviceDesc) {
this.deviceDesc = checkNotNull(deviceDesc);
}
@@ -456,8 +503,6 @@
* Collection of Description of a Device and it's Ports given from a Provider.
*/
private static class DeviceDescriptions {
- // private final DeviceId id;
- // private final ProviderId pid;
private final AtomicReference<DeviceDescription> deviceDesc;
private final ConcurrentMap<PortNumber, PortDescription> portDescs;
@@ -475,10 +520,6 @@
return portDescs.get(number);
}
- public Collection<PortDescription> getPortDescs() {
- return Collections.unmodifiableCollection(portDescs.values());
- }
-
/**
* Puts DeviceDescription, merging annotations as necessary.
*