Experimenting multi Provider support on SimpelDeviceStore.
Change-Id: I181db7704556768863624f072540d141e39d0904
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
index df20b2d..bc0a055 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
@@ -3,6 +3,8 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -10,6 +12,7 @@
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Device.Type;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
@@ -23,21 +26,27 @@
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+// TODO: synchronization should be done in more fine-grained manner.
/**
* Manages inventory of infrastructure devices using trivial in-memory
* structures implementation.
@@ -52,9 +61,18 @@
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
- private final Map<DeviceId, DefaultDevice> devices = new ConcurrentHashMap<>();
+ // collection of Description given from various providers
+ private final ConcurrentMap<DeviceId,
+ ConcurrentMap<ProviderId, DeviceDescriptions>>
+ deviceDescs = new ConcurrentHashMap<>();
+
+ // cache of Device and Ports generated by compositing descriptions from providers
+ private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+
+ // available(=UP) devices
private final Set<DeviceId> availableDevices = new HashSet<>();
- private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = new HashMap<>();
+
@Activate
public void activate() {
@@ -73,7 +91,7 @@
@Override
public Iterable<Device> getDevices() {
- return Collections.unmodifiableSet(new HashSet<Device>(devices.values()));
+ return Collections.unmodifiableCollection(devices.values());
}
@Override
@@ -82,82 +100,115 @@
}
@Override
- public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+ public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription deviceDescription) {
- DefaultDevice device = devices.get(deviceId);
- if (device == null) {
- return createDevice(providerId, deviceId, deviceDescription);
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+ = createIfAbsentUnchecked(deviceDescs, deviceId,
+ new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+
+ Device oldDevice = devices.get(deviceId);
+
+ DeviceDescriptions descs
+ = createIfAbsentUnchecked(providerDescs, providerId,
+ new InitDeviceDescs(deviceDescription));
+
+ descs.putDeviceDesc(deviceDescription);
+
+ Device newDevice = composeDevice(deviceId, providerDescs);
+
+ if (oldDevice == null) {
+ // ADD
+ return createDevice(providerId, newDevice);
+ } else {
+ // UPDATE or ignore (no change or stale)
+ return updateDevice(providerId, oldDevice, newDevice);
}
- return updateDevice(providerId, device, deviceDescription);
}
// Creates the device and returns the appropriate event if necessary.
- private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
- DeviceDescription desc) {
- DefaultDevice device = new DefaultDevice(providerId, deviceId, desc.type(),
- desc.manufacturer(),
- desc.hwVersion(), desc.swVersion(),
- desc.serialNumber());
+ private DeviceEvent createDevice(ProviderId providerId, Device newDevice) {
+
+ // update composed device cache
synchronized (this) {
- devices.put(deviceId, device);
- availableDevices.add(deviceId);
+ devices.putIfAbsent(newDevice.id(), newDevice);
+ if (!providerId.isAncillary()) {
+ availableDevices.add(newDevice.id());
+ }
}
- return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
+
+ return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
}
// Updates the device and returns the appropriate event if necessary.
- private DeviceEvent updateDevice(ProviderId providerId, DefaultDevice device,
- DeviceDescription desc) {
+ private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) {
+
// We allow only certain attributes to trigger update
- if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
- !Objects.equals(device.swVersion(), desc.swVersion())) {
- DefaultDevice updated = new DefaultDevice(providerId, device.id(),
- desc.type(),
- desc.manufacturer(),
- desc.hwVersion(),
- desc.swVersion(),
- desc.serialNumber());
+ if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
+ !Objects.equals(oldDevice.swVersion(), newDevice.swVersion())) {
+
synchronized (this) {
- devices.put(device.id(), updated);
- availableDevices.add(device.id());
+ devices.replace(newDevice.id(), oldDevice, newDevice);
+ if (!providerId.isAncillary()) {
+ availableDevices.add(newDevice.id());
+ }
}
- return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
+ return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
}
- // Otherwise merely attempt to change availability
- synchronized (this) {
- boolean added = availableDevices.add(device.id());
+ // Otherwise merely attempt to change availability if primary provider
+ if (!providerId.isAncillary()) {
+ synchronized (this) {
+ boolean added = availableDevices.add(newDevice.id());
return !added ? null :
- new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+ new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
+ }
}
+ return null;
}
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
synchronized (this) {
Device device = devices.get(deviceId);
- boolean removed = device != null && availableDevices.remove(deviceId);
+ boolean removed = (device != null) && availableDevices.remove(deviceId);
return !removed ? null :
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
}
}
@Override
- public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
+ public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
List<PortDescription> portDescriptions) {
+
+ // TODO: implement multi-provider
+ Device device = devices.get(deviceId);
+ checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+ ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+ checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+ DeviceDescriptions descs = descsMap.get(providerId);
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
+
List<DeviceEvent> events = new ArrayList<>();
synchronized (this) {
- Device device = devices.get(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- Map<PortNumber, Port> ports = getPortMap(deviceId);
+ ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
// Add new ports
Set<PortNumber> processed = new HashSet<>();
for (PortDescription portDescription : portDescriptions) {
- Port port = ports.get(portDescription.portNumber());
- events.add(port == null ?
- createPort(device, portDescription, ports) :
- updatePort(device, port, portDescription, ports));
+ PortNumber number = portDescription.portNumber();
+ Port oldPort = ports.get(number);
+ // update description
+ descs.putPortDesc(number, portDescription);
+ Port newPort = composePort(device, number, descsMap);
+
+ events.add(oldPort == null ?
+ createPort(device, newPort, ports) :
+ updatePort(device, oldPort, newPort, ports));
processed.add(portDescription.portNumber());
}
@@ -168,25 +219,20 @@
// Creates a new port based on the port description adds it to the map and
// Returns corresponding event.
- private DeviceEvent createPort(Device device, PortDescription portDescription,
- Map<PortNumber, Port> ports) {
- DefaultPort port = new DefaultPort(device, portDescription.portNumber(),
- portDescription.isEnabled());
- ports.put(port.number(), port);
- return new DeviceEvent(PORT_ADDED, device, port);
+ private DeviceEvent createPort(Device device, Port newPort,
+ ConcurrentMap<PortNumber, Port> ports) {
+ ports.put(newPort.number(), newPort);
+ return new DeviceEvent(PORT_ADDED, device, newPort);
}
// CHecks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event.
- private DeviceEvent updatePort(Device device, Port port,
- PortDescription portDescription,
- Map<PortNumber, Port> ports) {
- if (port.isEnabled() != portDescription.isEnabled()) {
- DefaultPort updatedPort =
- new DefaultPort(device, portDescription.portNumber(),
- portDescription.isEnabled());
- ports.put(port.number(), updatedPort);
- return new DeviceEvent(PORT_UPDATED, device, updatedPort);
+ private DeviceEvent updatePort(Device device, Port oldPort,
+ Port newPort,
+ ConcurrentMap<PortNumber, Port> ports) {
+ if (oldPort.isEnabled() != newPort.isEnabled()) {
+ ports.put(oldPort.number(), newPort);
+ return new DeviceEvent(PORT_UPDATED, device, newPort);
}
return null;
}
@@ -211,31 +257,48 @@
// Gets the map of ports for the specified device; if one does not already
// exist, it creates and registers a new one.
- private Map<PortNumber, Port> getPortMap(DeviceId deviceId) {
- Map<PortNumber, Port> ports = devicePorts.get(deviceId);
- if (ports == null) {
- ports = new HashMap<>();
- devicePorts.put(deviceId, ports);
- }
- return ports;
+ private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
+ return createIfAbsentUnchecked(devicePorts, deviceId,
+ new InitConcurrentHashMap<PortNumber, Port>());
}
@Override
- public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+ public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
+ Device device = devices.get(deviceId);
+ checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+ ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+ checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+ DeviceDescriptions descs = descsMap.get(providerId);
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
+ // TODO: implement multi-provider
synchronized (this) {
- Device device = devices.get(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- Map<PortNumber, Port> ports = getPortMap(deviceId);
- Port port = ports.get(portDescription.portNumber());
- return updatePort(device, port, portDescription, ports);
+ ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+ final PortNumber number = portDescription.portNumber();
+ Port oldPort = ports.get(number);
+ // update description
+ descs.putPortDesc(number, portDescription);
+ Port newPort = composePort(device, number, descsMap);
+ if (oldPort == null) {
+ return createPort(device, newPort, ports);
+ } else {
+ return updatePort(device, oldPort, newPort, ports);
+ }
}
}
@Override
public List<Port> getPorts(DeviceId deviceId) {
Map<PortNumber, Port> ports = devicePorts.get(deviceId);
- return ports == null ? new ArrayList<Port>() : ImmutableList.copyOf(ports.values());
+ if (ports == null) {
+ return Collections.emptyList();
+ }
+ return ImmutableList.copyOf(ports.values());
}
@Override
@@ -257,4 +320,136 @@
new DeviceEvent(DEVICE_REMOVED, device, null);
}
}
+
+ /**
+ * Returns a Device, merging description given from multiple Providers.
+ *
+ * @param deviceId device identifier
+ * @param providerDescs Collection of Descriptions from multiple providers
+ * @return Device instance
+ */
+ private Device composeDevice(DeviceId deviceId,
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+ checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
+
+ ProviderId primary = pickPrimaryPID(providerDescs);
+
+ DeviceDescriptions desc = providerDescs.get(primary);
+ Type type = desc.getDeviceDesc().type();
+ String manufacturer = desc.getDeviceDesc().manufacturer();
+ String hwVersion = desc.getDeviceDesc().hwVersion();
+ String swVersion = desc.getDeviceDesc().swVersion();
+ String serialNumber = desc.getDeviceDesc().serialNumber();
+
+ for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+ if (e.getKey().equals(primary)) {
+ continue;
+ }
+ // FIXME: implement attribute merging once we have K-V attributes
+ }
+
+ return new DefaultDevice(primary, deviceId , type, manufacturer, hwVersion, swVersion, serialNumber);
+ }
+
+ // probably want composePorts
+ private Port composePort(Device device, PortNumber number,
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+ ProviderId primary = pickPrimaryPID(providerDescs);
+ DeviceDescriptions primDescs = providerDescs.get(primary);
+ final PortDescription portDesc = primDescs.getPortDesc(number);
+ boolean isEnabled;
+ if (portDesc != null) {
+ isEnabled = portDesc.isEnabled();
+ } else {
+ // if no primary, assume not enabled
+ // TODO: revisit this port enabled/disabled behavior
+ isEnabled = false;
+ }
+
+ for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+ if (e.getKey().equals(primary)) {
+ continue;
+ }
+ // FIXME: implement attribute merging once we have K-V attributes
+ }
+
+ return new DefaultPort(device, number, isEnabled);
+ }
+
+ /**
+ * @return primary ProviderID, or randomly chosen one if none exists
+ */
+ private ProviderId pickPrimaryPID(
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+ ProviderId fallBackPrimary = null;
+ for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+ if (!e.getKey().isAncillary()) {
+ return e.getKey();
+ } else if (fallBackPrimary == null) {
+ // pick randomly as a fallback in case there is no primary
+ fallBackPrimary = e.getKey();
+ }
+ }
+ return fallBackPrimary;
+ }
+
+ // TODO: can be made generic
+ private static final class InitConcurrentHashMap<K, V> implements
+ ConcurrentInitializer<ConcurrentMap<K, V>> {
+ @Override
+ public ConcurrentMap<K, V> get() throws ConcurrentException {
+ return new ConcurrentHashMap<>();
+ }
+ }
+
+ public static final class InitDeviceDescs
+ implements ConcurrentInitializer<DeviceDescriptions> {
+ private final DeviceDescription deviceDesc;
+ public InitDeviceDescs(DeviceDescription deviceDesc) {
+ this.deviceDesc = checkNotNull(deviceDesc);
+ }
+ @Override
+ public DeviceDescriptions get() throws ConcurrentException {
+ return new DeviceDescriptions(deviceDesc);
+ }
+ }
+
+
+ /**
+ * Collection of Description of a Device and it's Ports given from a Provider.
+ */
+ private static class DeviceDescriptions {
+ // private final DeviceId id;
+ // private final ProviderId pid;
+
+ private final AtomicReference<DeviceDescription> deviceDesc;
+ private final ConcurrentMap<PortNumber, PortDescription> portDescs;
+
+ public DeviceDescriptions(DeviceDescription desc) {
+ this.deviceDesc = new AtomicReference<>(desc);
+ this.portDescs = new ConcurrentHashMap<>();
+ }
+
+ public DeviceDescription getDeviceDesc() {
+ return deviceDesc.get();
+ }
+
+ public PortDescription getPortDesc(PortNumber number) {
+ return portDescs.get(number);
+ }
+
+ public Collection<PortDescription> getPortDescs() {
+ return Collections.unmodifiableCollection(portDescs.values());
+ }
+
+ public DeviceDescription putDeviceDesc(DeviceDescription newDesc) {
+ return deviceDesc.getAndSet(newDesc);
+ }
+
+ public PortDescription putPortDesc(PortNumber number, PortDescription newDesc) {
+ return portDescs.put(number, newDesc);
+ }
+ }
}