DeviceStore update
- Add off-line/remove handling to Gossip~
- Backport lock scope to Simple~
Change-Id: I5b4c8e12738ef78920341fb8699c4b07bde8712a
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 85d9b07..8316769 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -2,7 +2,8 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
-
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate;
@@ -59,7 +60,7 @@
import static org.onlab.onos.net.DefaultAnnotations.union;
import static com.google.common.base.Verify.verify;
-// TODO: implement remove event handling and call *Internal
+// TODO: give me a better name
/**
* Manages inventory of infrastructure devices using gossip protocol to distribute
* information.
@@ -79,14 +80,18 @@
// collection of Description given from various providers
private final ConcurrentMap<DeviceId,
ConcurrentMap<ProviderId, DeviceDescriptions>>
- deviceDescs = new ConcurrentHashMap<>();
+ deviceDescs = Maps.newConcurrentMap();
// cache of Device and Ports generated by compositing descriptions from providers
- private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
- private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
+ private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
+
+ // to be updated under Device lock
+ private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
+ private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
// available(=UP) devices
- private final Set<DeviceId> availableDevices = new HashSet<>();
+ private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@@ -121,7 +126,8 @@
}
@Override
- public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+ public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
+ DeviceId deviceId,
DeviceDescription deviceDescription) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
@@ -133,22 +139,26 @@
return event;
}
- private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, DeviceId deviceId,
- Timestamped<DeviceDescription> deltaDesc) {
+ private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<DeviceDescription> deltaDesc) {
// Collection of DeviceDescriptions for a Device
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= getDeviceDescriptions(deviceId);
-
- DeviceDescriptions descs
- = createIfAbsentUnchecked(providerDescs, providerId,
- new InitDeviceDescs(deltaDesc));
-
- // update description
synchronized (providerDescs) {
// locking per device
+ if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
+ log.debug("Ignoring outdated event: {}", deltaDesc);
+ return null;
+ }
+
+ DeviceDescriptions descs
+ = createIfAbsentUnchecked(providerDescs, providerId,
+ new InitDeviceDescs(deltaDesc));
+
final Device oldDevice = devices.get(deviceId);
final Device newDevice;
@@ -163,18 +173,18 @@
}
if (oldDevice == null) {
// ADD
- return createDevice(providerId, newDevice);
+ return createDevice(providerId, newDevice, deltaDesc.timestamp());
} else {
// UPDATE or ignore (no change or stale)
- return updateDevice(providerId, oldDevice, newDevice);
+ return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
}
}
}
// Creates the device and returns the appropriate event if necessary.
- // Guarded by deviceDescs value (=locking Device)
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent createDevice(ProviderId providerId,
- Device newDevice) {
+ Device newDevice, Timestamp timestamp) {
// update composed device cache
Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
@@ -183,16 +193,17 @@
providerId, oldDevice, newDevice);
if (!providerId.isAncillary()) {
- availableDevices.add(newDevice.id());
+ markOnline(newDevice.id(), timestamp);
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
}
// Updates the device and returns the appropriate event if necessary.
- // Guarded by deviceDescs value (=locking Device)
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent updateDevice(ProviderId providerId,
- Device oldDevice, Device newDevice) {
+ Device oldDevice,
+ Device newDevice, Timestamp newTimestamp) {
// We allow only certain attributes to trigger update
if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
@@ -207,14 +218,14 @@
, newDevice);
}
if (!providerId.isAncillary()) {
- availableDevices.add(newDevice.id());
+ markOnline(newDevice.id(), newTimestamp);
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
}
// Otherwise merely attempt to change availability if primary provider
if (!providerId.isAncillary()) {
- boolean added = availableDevices.add(newDevice.id());
+ boolean added = markOnline(newDevice.id(), newTimestamp);
return !added ? null :
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
}
@@ -223,11 +234,29 @@
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
- ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+ Timestamp timestamp = clockService.getTimestamp(deviceId);
+ return markOfflineInternal(deviceId, timestamp);
+ }
+
+ private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
+
+ Map<ProviderId, DeviceDescriptions> providerDescs
= getDeviceDescriptions(deviceId);
// locking device
synchronized (providerDescs) {
+
+ // accept off-line if given timestamp is newer than
+ // the latest Timestamp from Primary provider
+ DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
+ Timestamp lastTimestamp = primDescs.getLatestTimestamp();
+ if (timestamp.compareTo(lastTimestamp) <= 0) {
+ // outdated event ignore
+ return null;
+ }
+
+ offline.put(deviceId, timestamp);
+
Device device = devices.get(deviceId);
if (device == null) {
return null;
@@ -236,15 +265,37 @@
if (removed) {
// TODO: broadcast ... DOWN only?
return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
-
}
return null;
}
}
+ /**
+ * Marks the device as available if the given timestamp is not outdated,
+ * compared to the time the device has been marked offline.
+ *
+ * @param deviceId identifier of the device
+ * @param timestamp of the event triggering this change.
+ * @return true if availability change request was accepted and changed the state
+ */
+ // Guarded by deviceDescs value (=Device lock)
+ private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
+ // accept on-line if given timestamp is newer than
+ // the latest offline request Timestamp
+ Timestamp offlineTimestamp = offline.get(deviceId);
+ if (offlineTimestamp == null ||
+ offlineTimestamp.compareTo(timestamp) < 0) {
+
+ offline.remove(deviceId);
+ return availableDevices.add(deviceId);
+ }
+ return false;
+ }
+
@Override
- public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
- List<PortDescription> portDescriptions) {
+ public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
+ DeviceId deviceId,
+ List<PortDescription> portDescriptions) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
@@ -252,7 +303,8 @@
deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
}
- List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, deltaDescs);
+ List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
+ new Timestamped<>(portDescriptions, newTimestamp));
if (!events.isEmpty()) {
// FIXME: broadcast deltaDesc, UP
log.debug("broadcast deltaDesc");
@@ -261,8 +313,9 @@
}
- private List<DeviceEvent> updatePortsInternal(ProviderId providerId, DeviceId deviceId,
- List<Timestamped<PortDescription>> deltaDescs) {
+ private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<List<PortDescription>> portDescriptions) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@@ -270,30 +323,41 @@
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
- DeviceDescriptions descs = descsMap.get(providerId);
- // every provider must provide DeviceDescription.
- checkArgument(descs != null,
- "Device description for Device ID %s from Provider %s was not found",
- deviceId, providerId);
-
List<DeviceEvent> events = new ArrayList<>();
synchronized (descsMap) {
+
+ if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
+ log.debug("Ignoring outdated events: {}", portDescriptions);
+ return null;
+ }
+
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // every provider must provide DeviceDescription.
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
Map<PortNumber, Port> ports = getPortMap(deviceId);
+ final Timestamp newTimestamp = portDescriptions.timestamp();
+
// Add new ports
Set<PortNumber> processed = new HashSet<>();
- for (Timestamped<PortDescription> deltaDesc : deltaDescs) {
- final PortNumber number = deltaDesc.value().portNumber();
+ for (PortDescription portDescription : portDescriptions.value()) {
+ final PortNumber number = portDescription.portNumber();
+ processed.add(number);
+
final Port oldPort = ports.get(number);
final Port newPort;
+
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null ||
- deltaDesc == existingPortDesc ||
- deltaDesc.isNewer(existingPortDesc)) {
+ newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
// on new port or valid update
// update description
- descs.putPortDesc(deltaDesc);
+ descs.putPortDesc(new Timestamped<>(portDescription,
+ portDescriptions.timestamp()));
newPort = composePort(device, number, descsMap);
} else {
// outdated event, ignored.
@@ -303,7 +367,6 @@
events.add(oldPort == null ?
createPort(device, newPort, ports) :
updatePort(device, oldPort, newPort, ports));
- processed.add(number);
}
events.addAll(pruneOldPorts(device, ports, processed));
@@ -313,7 +376,7 @@
// Creates a new port based on the port description adds it to the map and
// Returns corresponding event.
- // Guarded by deviceDescs value (=locking Device)
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent createPort(Device device, Port newPort,
Map<PortNumber, Port> ports) {
ports.put(newPort.number(), newPort);
@@ -322,7 +385,7 @@
// Checks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event.
- // Guarded by deviceDescs value (=locking Device)
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent updatePort(Device device, Port oldPort,
Port newPort,
Map<PortNumber, Port> ports) {
@@ -337,7 +400,7 @@
// Prunes the specified list of ports based on which ports are in the
// processed list and returns list of corresponding events.
- // Guarded by deviceDescs value (=locking Device)
+ // Guarded by deviceDescs value (=Device lock)
private List<DeviceEvent> pruneOldPorts(Device device,
Map<PortNumber, Port> ports,
Set<PortNumber> processed) {
@@ -389,13 +452,19 @@
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
- DeviceDescriptions descs = descsMap.get(providerId);
- // assuming all providers must to give DeviceDescription
- checkArgument(descs != null,
- "Device description for Device ID %s from Provider %s was not found",
- deviceId, providerId);
-
synchronized (descsMap) {
+
+ if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
+ log.debug("Ignoring outdated event: {}", deltaDesc);
+ return null;
+ }
+
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // assuming all providers must to give DeviceDescription
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
final PortNumber number = deltaDesc.value().portNumber();
final Port oldPort = ports.get(number);
@@ -443,19 +512,51 @@
}
@Override
- public DeviceEvent removeDevice(DeviceId deviceId) {
- ConcurrentMap<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
+ public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
+ Timestamp timestamp = clockService.getTimestamp(deviceId);
+ DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
+ // TODO: broadcast removal event
+ return event;
+ }
+
+ private DeviceEvent removeDeviceInternal(DeviceId deviceId,
+ Timestamp timestamp) {
+
+ Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
synchronized (descs) {
+ // accept removal request if given timestamp is newer than
+ // the latest Timestamp from Primary provider
+ DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
+ Timestamp lastTimestamp = primDescs.getLatestTimestamp();
+ if (timestamp.compareTo(lastTimestamp) <= 0) {
+ // outdated event ignore
+ return null;
+ }
+ removalRequest.put(deviceId, timestamp);
+
Device device = devices.remove(deviceId);
// should DEVICE_REMOVED carry removed ports?
- devicePorts.get(deviceId).clear();
- availableDevices.remove(deviceId);
+ Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+ if (ports != null) {
+ ports.clear();
+ }
+ markOfflineInternal(deviceId, timestamp);
descs.clear();
return device == null ? null :
new DeviceEvent(DEVICE_REMOVED, device, null);
}
}
+ private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
+ Timestamp removalTimestamp = removalRequest.get(deviceId);
+ if (removalTimestamp != null &&
+ removalTimestamp.compareTo(timestampToCheck) >= 0) {
+ // removalRequest is more recent
+ return true;
+ }
+ return false;
+ }
+
/**
* Returns a Device, merging description given from multiple Providers.
*
@@ -472,7 +573,7 @@
DeviceDescriptions desc = providerDescs.get(primary);
- DeviceDescription base = desc.getDeviceDesc().value();
+ final DeviceDescription base = desc.getDeviceDesc().value();
Type type = base.type();
String manufacturer = base.manufacturer();
String hwVersion = base.hwVersion();
@@ -545,7 +646,7 @@
* @return primary ProviderID, or randomly chosen one if none exists
*/
private ProviderId pickPrimaryPID(
- ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+ Map<ProviderId, DeviceDescriptions> providerDescs) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (!e.getKey().isAncillary()) {
@@ -558,6 +659,12 @@
return fallBackPrimary;
}
+ private DeviceDescriptions getPrimaryDescriptions(
+ Map<ProviderId, DeviceDescriptions> providerDescs) {
+ ProviderId pid = pickPrimaryPID(providerDescs);
+ return providerDescs.get(pid);
+ }
+
public static final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> {
@@ -586,6 +693,16 @@
this.portDescs = new ConcurrentHashMap<>();
}
+ Timestamp getLatestTimestamp() {
+ Timestamp latest = deviceDesc.get().timestamp();
+ for (Timestamped<PortDescription> desc : portDescs.values()) {
+ if (desc.timestamp().compareTo(latest) > 0) {
+ latest = desc.timestamp();
+ }
+ }
+ return latest;
+ }
+
public Timestamped<DeviceDescription> getDeviceDesc() {
return deviceDesc.get();
}