implemented GossipDeviceStore with multi-provider, annotation support
Change-Id: I1953bdc37b28af79703ebcfc9201a71a2af49ab2
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
similarity index 92%
rename from core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
index a99482f..4dfc88b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
@@ -17,9 +17,12 @@
import org.onlab.onos.store.impl.OnosTimestamp;
import org.slf4j.Logger;
+/**
+ * Clock service to issue Timestamp based on Device Mastership.
+ */
@Component(immediate = true)
@Service
-public class OnosClockService implements ClockService {
+public class DeviceClockManager implements ClockService {
private final Logger log = getLogger(getClass());
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
new file mode 100644
index 0000000..0edbc21
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -0,0 +1,652 @@
+package org.onlab.onos.store.device.impl;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.Annotations;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Device.Type;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceStore;
+import org.onlab.onos.net.device.DeviceStoreDelegate;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.common.impl.Timestamped;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Predicates.notNull;
+import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.onos.net.DefaultAnnotations.merge;
+import static com.google.common.base.Verify.verify;
+
+// TODO: implement remove event handling and call *Internal
+/**
+ * Manages inventory of infrastructure devices using gossip protocol to distribute
+ * information.
+ */
+@Component(immediate = true)
+@Service
+public class GossipDeviceStore
+ extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
+ implements DeviceStore {
+
+ private final Logger log = getLogger(getClass());
+
+ public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+
+ // TODO: Check if inner Map can be replaced with plain Map
+ // innerMap is used to lock a Device, thus instance should never be replaced.
+ // collection of Description given from various providers
+ private final ConcurrentMap<DeviceId,
+ ConcurrentMap<ProviderId, DeviceDescriptions>>
+ deviceDescs = new ConcurrentHashMap<>();
+
+ // cache of Device and Ports generated by compositing descriptions from providers
+ private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+
+ // available(=UP) devices
+ private final Set<DeviceId> availableDevices = new HashSet<>();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClockService clockService;
+
+ @Activate
+ public void activate() {
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ deviceDescs.clear();
+ devices.clear();
+ devicePorts.clear();
+ availableDevices.clear();
+ log.info("Stopped");
+ }
+
+ @Override
+ public int getDeviceCount() {
+ return devices.size();
+ }
+
+ @Override
+ public Iterable<Device> getDevices() {
+ return Collections.unmodifiableCollection(devices.values());
+ }
+
+ @Override
+ public Device getDevice(DeviceId deviceId) {
+ return devices.get(deviceId);
+ }
+
+ @Override
+ public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+ DeviceDescription deviceDescription) {
+ Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+ final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
+ DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+ if (event != null) {
+ // FIXME: broadcast deltaDesc, UP
+ log.debug("broadcast deltaDesc");
+ }
+ return event;
+ }
+
+ private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, DeviceId deviceId,
+ Timestamped<DeviceDescription> deltaDesc) {
+
+ // Collection of DeviceDescriptions for a Device
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+ = createIfAbsentUnchecked(deviceDescs, deviceId,
+ new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+
+
+ DeviceDescriptions descs
+ = createIfAbsentUnchecked(providerDescs, providerId,
+ new InitDeviceDescs(deltaDesc));
+
+ // update description
+ synchronized (providerDescs) {
+ // locking per device
+
+ final Device oldDevice = devices.get(deviceId);
+ final Device newDevice;
+
+ if (deltaDesc == descs.getDeviceDesc() ||
+ deltaDesc.isNewer(descs.getDeviceDesc())) {
+ // on new device or valid update
+ descs.putDeviceDesc(deltaDesc);
+ newDevice = composeDevice(deviceId, providerDescs);
+ } else {
+ // outdated event, ignored.
+ return null;
+ }
+ if (oldDevice == null) {
+ // ADD
+ return createDevice(providerId, newDevice);
+ } else {
+ // UPDATE or ignore (no change or stale)
+ return updateDevice(providerId, oldDevice, newDevice);
+ }
+ }
+ }
+
+ // Creates the device and returns the appropriate event if necessary.
+ // Guarded by deviceDescs value (=locking Device)
+ private DeviceEvent createDevice(ProviderId providerId,
+ Device newDevice) {
+
+ // update composed device cache
+ Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
+ verify(oldDevice == null,
+ "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+ providerId, oldDevice, newDevice);
+
+ if (!providerId.isAncillary()) {
+ availableDevices.add(newDevice.id());
+ }
+
+ return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
+ }
+
+ // Updates the device and returns the appropriate event if necessary.
+ // Guarded by deviceDescs value (=locking Device)
+ private DeviceEvent updateDevice(ProviderId providerId,
+ Device oldDevice, Device newDevice) {
+
+ // We allow only certain attributes to trigger update
+ if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
+ !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
+ !isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) {
+
+ boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
+ if (!replaced) {
+ verify(replaced,
+ "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+ providerId, oldDevice, devices.get(newDevice.id())
+ , newDevice);
+ }
+ if (!providerId.isAncillary()) {
+ availableDevices.add(newDevice.id());
+ }
+ return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
+ }
+
+ // Otherwise merely attempt to change availability if primary provider
+ if (!providerId.isAncillary()) {
+ boolean added = availableDevices.add(newDevice.id());
+ return !added ? null :
+ new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
+ }
+ return null;
+ }
+
+ @Override
+ public DeviceEvent markOffline(DeviceId deviceId) {
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+ = createIfAbsentUnchecked(deviceDescs, deviceId,
+ new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+
+ // locking device
+ synchronized (providerDescs) {
+ Device device = devices.get(deviceId);
+ if (device == null) {
+ return null;
+ }
+ boolean removed = availableDevices.remove(deviceId);
+ if (removed) {
+ // TODO: broadcast ... DOWN only?
+ return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
+ List<PortDescription> portDescriptions) {
+ Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+
+ List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
+ for (PortDescription e : portDescriptions) {
+ deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
+ }
+
+ List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, deltaDescs);
+ if (!events.isEmpty()) {
+ // FIXME: broadcast deltaDesc, UP
+ log.debug("broadcast deltaDesc");
+ }
+ return events;
+
+ }
+
+ private List<DeviceEvent> updatePortsInternal(ProviderId providerId, DeviceId deviceId,
+ List<Timestamped<PortDescription>> deltaDescs) {
+
+ Device device = devices.get(deviceId);
+ checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+ ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+ checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // every provider must provide DeviceDescription.
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
+ List<DeviceEvent> events = new ArrayList<>();
+ synchronized (descsMap) {
+ Map<PortNumber, Port> ports = getPortMap(deviceId);
+
+ // Add new ports
+ Set<PortNumber> processed = new HashSet<>();
+ for (Timestamped<PortDescription> deltaDesc : deltaDescs) {
+ final PortNumber number = deltaDesc.value().portNumber();
+ final Port oldPort = ports.get(number);
+ final Port newPort;
+
+ final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+ if (existingPortDesc == null ||
+ deltaDesc == existingPortDesc ||
+ deltaDesc.isNewer(existingPortDesc)) {
+ // on new port or valid update
+ // update description
+ descs.putPortDesc(deltaDesc);
+ newPort = composePort(device, number, descsMap);
+ } else {
+ // outdated event, ignored.
+ continue;
+ }
+
+ events.add(oldPort == null ?
+ createPort(device, newPort, ports) :
+ updatePort(device, oldPort, newPort, ports));
+ processed.add(number);
+ }
+
+ events.addAll(pruneOldPorts(device, ports, processed));
+ }
+ return FluentIterable.from(events).filter(notNull()).toList();
+ }
+
+ // Creates a new port based on the port description adds it to the map and
+ // Returns corresponding event.
+ // Guarded by deviceDescs value (=locking Device)
+ private DeviceEvent createPort(Device device, Port newPort,
+ Map<PortNumber, Port> ports) {
+ ports.put(newPort.number(), newPort);
+ return new DeviceEvent(PORT_ADDED, device, newPort);
+ }
+
+ // Checks if the specified port requires update and if so, it replaces the
+ // existing entry in the map and returns corresponding event.
+ // Guarded by deviceDescs value (=locking Device)
+ private DeviceEvent updatePort(Device device, Port oldPort,
+ Port newPort,
+ Map<PortNumber, Port> ports) {
+ if (oldPort.isEnabled() != newPort.isEnabled() ||
+ !isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) {
+
+ ports.put(oldPort.number(), newPort);
+ return new DeviceEvent(PORT_UPDATED, device, newPort);
+ }
+ return null;
+ }
+
+ // Prunes the specified list of ports based on which ports are in the
+ // processed list and returns list of corresponding events.
+ // Guarded by deviceDescs value (=locking Device)
+ private List<DeviceEvent> pruneOldPorts(Device device,
+ Map<PortNumber, Port> ports,
+ Set<PortNumber> processed) {
+ List<DeviceEvent> events = new ArrayList<>();
+ Iterator<PortNumber> iterator = ports.keySet().iterator();
+ while (iterator.hasNext()) {
+ PortNumber portNumber = iterator.next();
+ if (!processed.contains(portNumber)) {
+ events.add(new DeviceEvent(PORT_REMOVED, device,
+ ports.get(portNumber)));
+ iterator.remove();
+ }
+ }
+ return events;
+ }
+
+ // Gets the map of ports for the specified device; if one does not already
+ // exist, it creates and registers a new one.
+ private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
+ return createIfAbsentUnchecked(devicePorts, deviceId,
+ new InitConcurrentHashMap<PortNumber, Port>());
+ }
+
+ @Override
+ public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+ PortDescription portDescription) {
+ Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+ final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
+ DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+ if (event != null) {
+ // FIXME: broadcast deltaDesc
+ log.debug("broadcast deltaDesc");
+ }
+ return event;
+ }
+
+ private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
+ Timestamped<PortDescription> deltaDesc) {
+
+ Device device = devices.get(deviceId);
+ checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+ ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+ checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // assuming all providers must to give DeviceDescription
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
+ synchronized (descsMap) {
+ ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+ final PortNumber number = deltaDesc.value().portNumber();
+ final Port oldPort = ports.get(number);
+ final Port newPort;
+
+ final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+ if (existingPortDesc == null ||
+ deltaDesc == existingPortDesc ||
+ deltaDesc.isNewer(existingPortDesc)) {
+ // on new port or valid update
+ // update description
+ descs.putPortDesc(deltaDesc);
+ newPort = composePort(device, number, descsMap);
+ } else {
+ // outdated event, ignored.
+ return null;
+ }
+
+ if (oldPort == null) {
+ return createPort(device, newPort, ports);
+ } else {
+ return updatePort(device, oldPort, newPort, ports);
+ }
+ }
+ }
+
+ @Override
+ public List<Port> getPorts(DeviceId deviceId) {
+ Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+ if (ports == null) {
+ return Collections.emptyList();
+ }
+ return ImmutableList.copyOf(ports.values());
+ }
+
+ @Override
+ public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+ Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+ return ports == null ? null : ports.get(portNumber);
+ }
+
+ @Override
+ public boolean isAvailable(DeviceId deviceId) {
+ return availableDevices.contains(deviceId);
+ }
+
+ @Override
+ public DeviceEvent removeDevice(DeviceId deviceId) {
+ synchronized (this) {
+ Device device = devices.remove(deviceId);
+ return device == null ? null :
+ new DeviceEvent(DEVICE_REMOVED, device, null);
+ }
+ }
+
+ private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
+ if (lhs == rhs) {
+ return true;
+ }
+ if (lhs == null || rhs == null) {
+ return false;
+ }
+
+ if (!lhs.keys().equals(rhs.keys())) {
+ return false;
+ }
+
+ for (String key : lhs.keys()) {
+ if (!lhs.value(key).equals(rhs.value(key))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns a Device, merging description given from multiple Providers.
+ *
+ * @param deviceId device identifier
+ * @param providerDescs Collection of Descriptions from multiple providers
+ * @return Device instance
+ */
+ private Device composeDevice(DeviceId deviceId,
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+ checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
+
+ ProviderId primary = pickPrimaryPID(providerDescs);
+
+ DeviceDescriptions desc = providerDescs.get(primary);
+
+ DeviceDescription base = desc.getDeviceDesc().value();
+ Type type = base.type();
+ String manufacturer = base.manufacturer();
+ String hwVersion = base.hwVersion();
+ String swVersion = base.swVersion();
+ String serialNumber = base.serialNumber();
+ DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+ annotations = merge(annotations, base.annotations());
+
+ for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+ if (e.getKey().equals(primary)) {
+ continue;
+ }
+ // TODO: should keep track of Description timestamp
+ // and only merge conflicting keys when timestamp is newer
+ // Currently assuming there will never be a key conflict between
+ // providers
+
+ // annotation merging. not so efficient, should revisit later
+ annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
+ }
+
+ return new DefaultDevice(primary, deviceId , type, manufacturer,
+ hwVersion, swVersion, serialNumber, annotations);
+ }
+
+ /**
+ * Returns a Port, merging description given from multiple Providers.
+ *
+ * @param device device the port is on
+ * @param number port number
+ * @param providerDescs Collection of Descriptions from multiple providers
+ * @return Port instance
+ */
+ private Port composePort(Device device, PortNumber number,
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+ ProviderId primary = pickPrimaryPID(providerDescs);
+ DeviceDescriptions primDescs = providerDescs.get(primary);
+ // if no primary, assume not enabled
+ // TODO: revisit this default port enabled/disabled behavior
+ boolean isEnabled = false;
+ DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+
+ final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
+ if (portDesc != null) {
+ isEnabled = portDesc.value().isEnabled();
+ annotations = merge(annotations, portDesc.value().annotations());
+ }
+
+ for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+ if (e.getKey().equals(primary)) {
+ continue;
+ }
+ // TODO: should keep track of Description timestamp
+ // and only merge conflicting keys when timestamp is newer
+ // Currently assuming there will never be a key conflict between
+ // providers
+
+ // annotation merging. not so efficient, should revisit later
+ final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
+ if (otherPortDesc != null) {
+ annotations = merge(annotations, otherPortDesc.value().annotations());
+ }
+ }
+
+ return new DefaultPort(device, number, isEnabled, annotations);
+ }
+
+ /**
+ * @return primary ProviderID, or randomly chosen one if none exists
+ */
+ private ProviderId pickPrimaryPID(
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+ ProviderId fallBackPrimary = null;
+ for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+ if (!e.getKey().isAncillary()) {
+ return e.getKey();
+ } else if (fallBackPrimary == null) {
+ // pick randomly as a fallback in case there is no primary
+ fallBackPrimary = e.getKey();
+ }
+ }
+ return fallBackPrimary;
+ }
+
+ private static final class InitConcurrentHashMap<K, V> implements
+ ConcurrentInitializer<ConcurrentMap<K, V>> {
+ @Override
+ public ConcurrentMap<K, V> get() throws ConcurrentException {
+ return new ConcurrentHashMap<>();
+ }
+ }
+
+ public static final class InitDeviceDescs
+ implements ConcurrentInitializer<DeviceDescriptions> {
+
+ private final Timestamped<DeviceDescription> deviceDesc;
+
+ public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
+ this.deviceDesc = checkNotNull(deviceDesc);
+ }
+ @Override
+ public DeviceDescriptions get() throws ConcurrentException {
+ return new DeviceDescriptions(deviceDesc);
+ }
+ }
+
+
+ /**
+ * Collection of Description of a Device and it's Ports given from a Provider.
+ */
+ public static class DeviceDescriptions {
+
+ private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
+ private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
+
+ public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
+ this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
+ this.portDescs = new ConcurrentHashMap<>();
+ }
+
+ public Timestamped<DeviceDescription> getDeviceDesc() {
+ return deviceDesc.get();
+ }
+
+ public Timestamped<PortDescription> getPortDesc(PortNumber number) {
+ return portDescs.get(number);
+ }
+
+ /**
+ * Puts DeviceDescription, merging annotations as necessary.
+ *
+ * @param newDesc new DeviceDescription
+ * @return previous DeviceDescription
+ */
+ public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
+ Timestamped<DeviceDescription> oldOne = deviceDesc.get();
+ Timestamped<DeviceDescription> newOne = newDesc;
+ if (oldOne != null) {
+ SparseAnnotations merged = merge(oldOne.value().annotations(),
+ newDesc.value().annotations());
+ newOne = new Timestamped<DeviceDescription>(
+ new DefaultDeviceDescription(newDesc.value(), merged),
+ newDesc.timestamp());
+ }
+ return deviceDesc.getAndSet(newOne);
+ }
+
+ /**
+ * Puts PortDescription, merging annotations as necessary.
+ *
+ * @param newDesc new PortDescription
+ * @return previous PortDescription
+ */
+ public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
+ Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
+ Timestamped<PortDescription> newOne = newDesc;
+ if (oldOne != null) {
+ SparseAnnotations merged = merge(oldOne.value().annotations(),
+ newDesc.value().annotations());
+ newOne = new Timestamped<PortDescription>(
+ new DefaultPortDescription(newDesc.value(), merged),
+ newDesc.timestamp());
+ }
+ return portDescs.put(newOne.value().portNumber(), newOne);
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
deleted file mode 100644
index c858aba..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
+++ /dev/null
@@ -1,340 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import static com.google.common.base.Predicates.notNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.net.DefaultDevice;
-import org.onlab.onos.net.DefaultPort;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.device.DeviceDescription;
-import org.onlab.onos.net.device.DeviceEvent;
-import org.onlab.onos.net.device.DeviceStore;
-import org.onlab.onos.net.device.DeviceStoreDelegate;
-import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.ClockService;
-import org.onlab.onos.store.Timestamp;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.onos.net.device.DeviceEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-//TODO: Add support for multiple provider and annotations
-/**
- * Manages inventory of infrastructure devices using a protocol that takes into consideration
- * the order in which device events occur.
- */
-@Component(immediate = true)
-@Service
-public class OnosDistributedDeviceStore
- extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
- implements DeviceStore {
-
- private final Logger log = getLogger(getClass());
-
- public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
-
- private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
- private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClockService clockService;
-
- @Activate
- public void activate() {
-
- devices = new ConcurrentHashMap<>();
- devicePorts = new ConcurrentHashMap<>();
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public int getDeviceCount() {
- return devices.size();
- }
-
- @Override
- public Iterable<Device> getDevices() {
- Builder<Device> builder = ImmutableSet.builder();
- synchronized (this) {
- for (VersionedValue<Device> device : devices.values()) {
- builder.add(device.entity());
- }
- return builder.build();
- }
- }
-
- @Override
- public Device getDevice(DeviceId deviceId) {
- VersionedValue<Device> device = devices.get(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- return device.entity();
- }
-
- @Override
- public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
- DeviceDescription deviceDescription) {
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- VersionedValue<Device> device = devices.get(deviceId);
-
- if (device == null) {
- return createDevice(providerId, deviceId, deviceDescription, newTimestamp);
- }
-
- checkState(newTimestamp.compareTo(device.timestamp()) > 0,
- "Existing device has a timestamp in the future!");
-
- return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp);
- }
-
- // Creates the device and returns the appropriate event if necessary.
- private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
- DeviceDescription desc, Timestamp timestamp) {
- Device device = new DefaultDevice(providerId, deviceId, desc.type(),
- desc.manufacturer(),
- desc.hwVersion(), desc.swVersion(),
- desc.serialNumber());
-
- devices.put(deviceId, new VersionedValue<>(device, true, timestamp));
- // TODO,FIXME: broadcast a message telling peers of a device event.
- return new DeviceEvent(DEVICE_ADDED, device, null);
- }
-
- // Updates the device and returns the appropriate event if necessary.
- private DeviceEvent updateDevice(ProviderId providerId, Device device,
- DeviceDescription desc, Timestamp timestamp) {
- // We allow only certain attributes to trigger update
- if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
- !Objects.equals(device.swVersion(), desc.swVersion())) {
-
- Device updated = new DefaultDevice(providerId, device.id(),
- desc.type(),
- desc.manufacturer(),
- desc.hwVersion(),
- desc.swVersion(),
- desc.serialNumber());
- devices.put(device.id(), new VersionedValue<Device>(updated, true, timestamp));
- // FIXME: broadcast a message telling peers of a device event.
- return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
- }
-
- // Otherwise merely attempt to change availability
- Device updated = new DefaultDevice(providerId, device.id(),
- desc.type(),
- desc.manufacturer(),
- desc.hwVersion(),
- desc.swVersion(),
- desc.serialNumber());
-
- VersionedValue<Device> oldDevice = devices.put(device.id(),
- new VersionedValue<Device>(updated, true, timestamp));
- if (!oldDevice.isUp()) {
- return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
- } else {
- return null;
- }
- }
-
- @Override
- public DeviceEvent markOffline(DeviceId deviceId) {
- VersionedValue<Device> device = devices.get(deviceId);
- boolean willRemove = device != null && device.isUp();
- if (!willRemove) {
- return null;
- }
- Timestamp timestamp = clockService.getTimestamp(deviceId);
- if (replaceIfLatest(device.entity(), false, timestamp)) {
- return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device.entity(), null);
- }
- return null;
- }
-
- // Replace existing value if its timestamp is older.
- private synchronized boolean replaceIfLatest(Device device, boolean isUp, Timestamp timestamp) {
- VersionedValue<Device> existingValue = devices.get(device.id());
- if (timestamp.compareTo(existingValue.timestamp()) > 0) {
- devices.put(device.id(), new VersionedValue<Device>(device, isUp, timestamp));
- return true;
- }
- return false;
- }
-
- @Override
- public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
- List<PortDescription> portDescriptions) {
- List<DeviceEvent> events = new ArrayList<>();
- synchronized (this) {
- VersionedValue<Device> device = devices.get(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
-
- // Add new ports
- Set<PortNumber> processed = new HashSet<>();
- for (PortDescription portDescription : portDescriptions) {
- VersionedValue<Port> port = ports.get(portDescription.portNumber());
- if (port == null) {
- events.add(createPort(device, portDescription, ports, newTimestamp));
- }
- checkState(newTimestamp.compareTo(port.timestamp()) > 0,
- "Existing port state has a timestamp in the future!");
- events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp));
- processed.add(portDescription.portNumber());
- }
-
- updatePortMap(deviceId, ports);
-
- events.addAll(pruneOldPorts(device.entity(), ports, processed));
- }
- return FluentIterable.from(events).filter(notNull()).toList();
- }
-
- // Creates a new port based on the port description adds it to the map and
- // Returns corresponding event.
- //@GuardedBy("this")
- private DeviceEvent createPort(VersionedValue<Device> device, PortDescription portDescription,
- Map<PortNumber, VersionedValue<Port>> ports, Timestamp timestamp) {
- Port port = new DefaultPort(device.entity(), portDescription.portNumber(),
- portDescription.isEnabled());
- ports.put(port.number(), new VersionedValue<Port>(port, true, timestamp));
- updatePortMap(device.entity().id(), ports);
- return new DeviceEvent(PORT_ADDED, device.entity(), port);
- }
-
- // Checks if the specified port requires update and if so, it replaces the
- // existing entry in the map and returns corresponding event.
- //@GuardedBy("this")
- private DeviceEvent updatePort(Device device, Port port,
- PortDescription portDescription,
- Map<PortNumber, VersionedValue<Port>> ports,
- Timestamp timestamp) {
- if (port.isEnabled() != portDescription.isEnabled()) {
- VersionedValue<Port> updatedPort = new VersionedValue<Port>(
- new DefaultPort(device, portDescription.portNumber(),
- portDescription.isEnabled()),
- portDescription.isEnabled(),
- timestamp);
- ports.put(port.number(), updatedPort);
- updatePortMap(device.id(), ports);
- return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity());
- }
- return null;
- }
-
- // Prunes the specified list of ports based on which ports are in the
- // processed list and returns list of corresponding events.
- //@GuardedBy("this")
- private List<DeviceEvent> pruneOldPorts(Device device,
- Map<PortNumber, VersionedValue<Port>> ports,
- Set<PortNumber> processed) {
- List<DeviceEvent> events = new ArrayList<>();
- Iterator<PortNumber> iterator = ports.keySet().iterator();
- while (iterator.hasNext()) {
- PortNumber portNumber = iterator.next();
- if (!processed.contains(portNumber)) {
- events.add(new DeviceEvent(PORT_REMOVED, device,
- ports.get(portNumber).entity()));
- iterator.remove();
- }
- }
- if (!events.isEmpty()) {
- updatePortMap(device.id(), ports);
- }
- return events;
- }
-
- // Gets the map of ports for the specified device; if one does not already
- // exist, it creates and registers a new one.
- // WARN: returned value is a copy, changes made to the Map
- // needs to be written back using updatePortMap
- //@GuardedBy("this")
- private Map<PortNumber, VersionedValue<Port>> getPortMap(DeviceId deviceId) {
- Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
- if (ports == null) {
- ports = new HashMap<>();
- // this probably is waste of time in most cases.
- updatePortMap(deviceId, ports);
- }
- return ports;
- }
-
- //@GuardedBy("this")
- private void updatePortMap(DeviceId deviceId, Map<PortNumber, VersionedValue<Port>> ports) {
- devicePorts.put(deviceId, ports);
- }
-
- @Override
- public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
- PortDescription portDescription) {
- VersionedValue<Device> device = devices.get(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
- VersionedValue<Port> port = ports.get(portDescription.portNumber());
- Timestamp timestamp = clockService.getTimestamp(deviceId);
- return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp);
- }
-
- @Override
- public List<Port> getPorts(DeviceId deviceId) {
- Map<PortNumber, VersionedValue<Port>> versionedPorts = devicePorts.get(deviceId);
- if (versionedPorts == null) {
- return Collections.emptyList();
- }
- List<Port> ports = new ArrayList<>();
- for (VersionedValue<Port> port : versionedPorts.values()) {
- ports.add(port.entity());
- }
- return ports;
- }
-
- @Override
- public Port getPort(DeviceId deviceId, PortNumber portNumber) {
- Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
- return ports == null ? null : ports.get(portNumber).entity();
- }
-
- @Override
- public boolean isAvailable(DeviceId deviceId) {
- return devices.get(deviceId).isUp();
- }
-
- @Override
- public DeviceEvent removeDevice(DeviceId deviceId) {
- VersionedValue<Device> previousDevice = devices.remove(deviceId);
- return previousDevice == null ? null :
- new DeviceEvent(DEVICE_REMOVED, previousDevice.entity(), null);
- }
-}