blob: 8316769c866dc6cb1a5c677e0fe5aa6f5f054661 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
3import com.google.common.collect.FluentIterable;
4import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07005import com.google.common.collect.Maps;
6import com.google.common.collect.Sets;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07007import org.apache.commons.lang3.concurrent.ConcurrentException;
8import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
9import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070015import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070016import org.onlab.onos.net.DefaultAnnotations;
17import org.onlab.onos.net.DefaultDevice;
18import org.onlab.onos.net.DefaultPort;
19import org.onlab.onos.net.Device;
20import org.onlab.onos.net.Device.Type;
21import org.onlab.onos.net.DeviceId;
22import org.onlab.onos.net.Port;
23import org.onlab.onos.net.PortNumber;
24import org.onlab.onos.net.SparseAnnotations;
25import org.onlab.onos.net.device.DefaultDeviceDescription;
26import org.onlab.onos.net.device.DefaultPortDescription;
27import org.onlab.onos.net.device.DeviceDescription;
28import org.onlab.onos.net.device.DeviceEvent;
29import org.onlab.onos.net.device.DeviceStore;
30import org.onlab.onos.net.device.DeviceStoreDelegate;
31import org.onlab.onos.net.device.PortDescription;
32import org.onlab.onos.net.provider.ProviderId;
33import org.onlab.onos.store.AbstractStore;
34import org.onlab.onos.store.ClockService;
35import org.onlab.onos.store.Timestamp;
36import org.onlab.onos.store.common.impl.Timestamped;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070037import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070038import org.slf4j.Logger;
39
40import java.util.ArrayList;
41import java.util.Collections;
42import java.util.HashSet;
43import java.util.Iterator;
44import java.util.List;
45import java.util.Map;
46import java.util.Map.Entry;
47import java.util.Objects;
48import java.util.Set;
49import java.util.concurrent.ConcurrentHashMap;
50import java.util.concurrent.ConcurrentMap;
51import java.util.concurrent.atomic.AtomicReference;
52
53import static com.google.common.base.Preconditions.checkArgument;
54import static com.google.common.base.Preconditions.checkNotNull;
55import static com.google.common.base.Predicates.notNull;
56import static org.onlab.onos.net.device.DeviceEvent.Type.*;
57import static org.slf4j.LoggerFactory.getLogger;
58import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
59import static org.onlab.onos.net.DefaultAnnotations.merge;
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -070060import static org.onlab.onos.net.DefaultAnnotations.union;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070061import static com.google.common.base.Verify.verify;
62
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070063// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070064/**
65 * Manages inventory of infrastructure devices using gossip protocol to distribute
66 * information.
67 */
68@Component(immediate = true)
69@Service
70public class GossipDeviceStore
71 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
72 implements DeviceStore {
73
74 private final Logger log = getLogger(getClass());
75
76 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
77
78 // TODO: Check if inner Map can be replaced with plain Map
79 // innerMap is used to lock a Device, thus instance should never be replaced.
80 // collection of Description given from various providers
81 private final ConcurrentMap<DeviceId,
82 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070083 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070084
85 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070086 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
87 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
88
89 // to be updated under Device lock
90 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
91 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070092
93 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070094 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070095
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected ClockService clockService;
98
99 @Activate
100 public void activate() {
101 log.info("Started");
102 }
103
104 @Deactivate
105 public void deactivate() {
106 deviceDescs.clear();
107 devices.clear();
108 devicePorts.clear();
109 availableDevices.clear();
110 log.info("Stopped");
111 }
112
113 @Override
114 public int getDeviceCount() {
115 return devices.size();
116 }
117
118 @Override
119 public Iterable<Device> getDevices() {
120 return Collections.unmodifiableCollection(devices.values());
121 }
122
123 @Override
124 public Device getDevice(DeviceId deviceId) {
125 return devices.get(deviceId);
126 }
127
128 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700129 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
130 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700131 DeviceDescription deviceDescription) {
132 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
133 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
134 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
135 if (event != null) {
136 // FIXME: broadcast deltaDesc, UP
137 log.debug("broadcast deltaDesc");
138 }
139 return event;
140 }
141
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700142 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
143 DeviceId deviceId,
144 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700145
146 // Collection of DeviceDescriptions for a Device
147 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700148 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700149
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700150 synchronized (providerDescs) {
151 // locking per device
152
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700153 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
154 log.debug("Ignoring outdated event: {}", deltaDesc);
155 return null;
156 }
157
158 DeviceDescriptions descs
159 = createIfAbsentUnchecked(providerDescs, providerId,
160 new InitDeviceDescs(deltaDesc));
161
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700162 final Device oldDevice = devices.get(deviceId);
163 final Device newDevice;
164
165 if (deltaDesc == descs.getDeviceDesc() ||
166 deltaDesc.isNewer(descs.getDeviceDesc())) {
167 // on new device or valid update
168 descs.putDeviceDesc(deltaDesc);
169 newDevice = composeDevice(deviceId, providerDescs);
170 } else {
171 // outdated event, ignored.
172 return null;
173 }
174 if (oldDevice == null) {
175 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700176 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700177 } else {
178 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700179 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700180 }
181 }
182 }
183
184 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700185 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700186 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700187 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700188
189 // update composed device cache
190 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
191 verify(oldDevice == null,
192 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
193 providerId, oldDevice, newDevice);
194
195 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700196 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700197 }
198
199 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
200 }
201
202 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700203 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700204 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700205 Device oldDevice,
206 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700207
208 // We allow only certain attributes to trigger update
209 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
210 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700211 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700212
213 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
214 if (!replaced) {
215 verify(replaced,
216 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
217 providerId, oldDevice, devices.get(newDevice.id())
218 , newDevice);
219 }
220 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700221 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700222 }
223 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
224 }
225
226 // Otherwise merely attempt to change availability if primary provider
227 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700228 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700229 return !added ? null :
230 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
231 }
232 return null;
233 }
234
235 @Override
236 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700237 Timestamp timestamp = clockService.getTimestamp(deviceId);
238 return markOfflineInternal(deviceId, timestamp);
239 }
240
241 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
242
243 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700244 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700245
246 // locking device
247 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700248
249 // accept off-line if given timestamp is newer than
250 // the latest Timestamp from Primary provider
251 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
252 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
253 if (timestamp.compareTo(lastTimestamp) <= 0) {
254 // outdated event ignore
255 return null;
256 }
257
258 offline.put(deviceId, timestamp);
259
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700260 Device device = devices.get(deviceId);
261 if (device == null) {
262 return null;
263 }
264 boolean removed = availableDevices.remove(deviceId);
265 if (removed) {
266 // TODO: broadcast ... DOWN only?
267 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700268 }
269 return null;
270 }
271 }
272
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700273 /**
274 * Marks the device as available if the given timestamp is not outdated,
275 * compared to the time the device has been marked offline.
276 *
277 * @param deviceId identifier of the device
278 * @param timestamp of the event triggering this change.
279 * @return true if availability change request was accepted and changed the state
280 */
281 // Guarded by deviceDescs value (=Device lock)
282 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
283 // accept on-line if given timestamp is newer than
284 // the latest offline request Timestamp
285 Timestamp offlineTimestamp = offline.get(deviceId);
286 if (offlineTimestamp == null ||
287 offlineTimestamp.compareTo(timestamp) < 0) {
288
289 offline.remove(deviceId);
290 return availableDevices.add(deviceId);
291 }
292 return false;
293 }
294
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700295 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700296 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
297 DeviceId deviceId,
298 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700299 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
300
301 List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
302 for (PortDescription e : portDescriptions) {
303 deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
304 }
305
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700306 List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
307 new Timestamped<>(portDescriptions, newTimestamp));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700308 if (!events.isEmpty()) {
309 // FIXME: broadcast deltaDesc, UP
310 log.debug("broadcast deltaDesc");
311 }
312 return events;
313
314 }
315
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700316 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
317 DeviceId deviceId,
318 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700319
320 Device device = devices.get(deviceId);
321 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
322
323 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
324 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
325
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700326 List<DeviceEvent> events = new ArrayList<>();
327 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700328
329 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
330 log.debug("Ignoring outdated events: {}", portDescriptions);
331 return null;
332 }
333
334 DeviceDescriptions descs = descsMap.get(providerId);
335 // every provider must provide DeviceDescription.
336 checkArgument(descs != null,
337 "Device description for Device ID %s from Provider %s was not found",
338 deviceId, providerId);
339
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700340 Map<PortNumber, Port> ports = getPortMap(deviceId);
341
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700342 final Timestamp newTimestamp = portDescriptions.timestamp();
343
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700344 // Add new ports
345 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700346 for (PortDescription portDescription : portDescriptions.value()) {
347 final PortNumber number = portDescription.portNumber();
348 processed.add(number);
349
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700350 final Port oldPort = ports.get(number);
351 final Port newPort;
352
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700353
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700354 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
355 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700356 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700357 // on new port or valid update
358 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700359 descs.putPortDesc(new Timestamped<>(portDescription,
360 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700361 newPort = composePort(device, number, descsMap);
362 } else {
363 // outdated event, ignored.
364 continue;
365 }
366
367 events.add(oldPort == null ?
368 createPort(device, newPort, ports) :
369 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700370 }
371
372 events.addAll(pruneOldPorts(device, ports, processed));
373 }
374 return FluentIterable.from(events).filter(notNull()).toList();
375 }
376
377 // Creates a new port based on the port description adds it to the map and
378 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700379 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700380 private DeviceEvent createPort(Device device, Port newPort,
381 Map<PortNumber, Port> ports) {
382 ports.put(newPort.number(), newPort);
383 return new DeviceEvent(PORT_ADDED, device, newPort);
384 }
385
386 // Checks if the specified port requires update and if so, it replaces the
387 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700388 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700389 private DeviceEvent updatePort(Device device, Port oldPort,
390 Port newPort,
391 Map<PortNumber, Port> ports) {
392 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700393 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700394
395 ports.put(oldPort.number(), newPort);
396 return new DeviceEvent(PORT_UPDATED, device, newPort);
397 }
398 return null;
399 }
400
401 // Prunes the specified list of ports based on which ports are in the
402 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700403 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700404 private List<DeviceEvent> pruneOldPorts(Device device,
405 Map<PortNumber, Port> ports,
406 Set<PortNumber> processed) {
407 List<DeviceEvent> events = new ArrayList<>();
408 Iterator<PortNumber> iterator = ports.keySet().iterator();
409 while (iterator.hasNext()) {
410 PortNumber portNumber = iterator.next();
411 if (!processed.contains(portNumber)) {
412 events.add(new DeviceEvent(PORT_REMOVED, device,
413 ports.get(portNumber)));
414 iterator.remove();
415 }
416 }
417 return events;
418 }
419
420 // Gets the map of ports for the specified device; if one does not already
421 // exist, it creates and registers a new one.
422 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
423 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700424 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
425 }
426
427 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
428 DeviceId deviceId) {
429 return createIfAbsentUnchecked(deviceDescs, deviceId,
430 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700431 }
432
433 @Override
434 public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
435 PortDescription portDescription) {
436 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
437 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
438 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
439 if (event != null) {
440 // FIXME: broadcast deltaDesc
441 log.debug("broadcast deltaDesc");
442 }
443 return event;
444 }
445
446 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
447 Timestamped<PortDescription> deltaDesc) {
448
449 Device device = devices.get(deviceId);
450 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
451
452 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
453 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
454
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700455 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700456
457 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
458 log.debug("Ignoring outdated event: {}", deltaDesc);
459 return null;
460 }
461
462 DeviceDescriptions descs = descsMap.get(providerId);
463 // assuming all providers must to give DeviceDescription
464 checkArgument(descs != null,
465 "Device description for Device ID %s from Provider %s was not found",
466 deviceId, providerId);
467
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700468 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
469 final PortNumber number = deltaDesc.value().portNumber();
470 final Port oldPort = ports.get(number);
471 final Port newPort;
472
473 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
474 if (existingPortDesc == null ||
475 deltaDesc == existingPortDesc ||
476 deltaDesc.isNewer(existingPortDesc)) {
477 // on new port or valid update
478 // update description
479 descs.putPortDesc(deltaDesc);
480 newPort = composePort(device, number, descsMap);
481 } else {
482 // outdated event, ignored.
483 return null;
484 }
485
486 if (oldPort == null) {
487 return createPort(device, newPort, ports);
488 } else {
489 return updatePort(device, oldPort, newPort, ports);
490 }
491 }
492 }
493
494 @Override
495 public List<Port> getPorts(DeviceId deviceId) {
496 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
497 if (ports == null) {
498 return Collections.emptyList();
499 }
500 return ImmutableList.copyOf(ports.values());
501 }
502
503 @Override
504 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
505 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
506 return ports == null ? null : ports.get(portNumber);
507 }
508
509 @Override
510 public boolean isAvailable(DeviceId deviceId) {
511 return availableDevices.contains(deviceId);
512 }
513
514 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700515 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
516 Timestamp timestamp = clockService.getTimestamp(deviceId);
517 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
518 // TODO: broadcast removal event
519 return event;
520 }
521
522 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
523 Timestamp timestamp) {
524
525 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700526 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700527 // accept removal request if given timestamp is newer than
528 // the latest Timestamp from Primary provider
529 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
530 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
531 if (timestamp.compareTo(lastTimestamp) <= 0) {
532 // outdated event ignore
533 return null;
534 }
535 removalRequest.put(deviceId, timestamp);
536
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700537 Device device = devices.remove(deviceId);
538 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700539 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
540 if (ports != null) {
541 ports.clear();
542 }
543 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700544 descs.clear();
545 return device == null ? null :
546 new DeviceEvent(DEVICE_REMOVED, device, null);
547 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700548 }
549
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700550 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
551 Timestamp removalTimestamp = removalRequest.get(deviceId);
552 if (removalTimestamp != null &&
553 removalTimestamp.compareTo(timestampToCheck) >= 0) {
554 // removalRequest is more recent
555 return true;
556 }
557 return false;
558 }
559
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700560 /**
561 * Returns a Device, merging description given from multiple Providers.
562 *
563 * @param deviceId device identifier
564 * @param providerDescs Collection of Descriptions from multiple providers
565 * @return Device instance
566 */
567 private Device composeDevice(DeviceId deviceId,
568 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
569
570 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
571
572 ProviderId primary = pickPrimaryPID(providerDescs);
573
574 DeviceDescriptions desc = providerDescs.get(primary);
575
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700576 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700577 Type type = base.type();
578 String manufacturer = base.manufacturer();
579 String hwVersion = base.hwVersion();
580 String swVersion = base.swVersion();
581 String serialNumber = base.serialNumber();
582 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
583 annotations = merge(annotations, base.annotations());
584
585 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
586 if (e.getKey().equals(primary)) {
587 continue;
588 }
589 // TODO: should keep track of Description timestamp
590 // and only merge conflicting keys when timestamp is newer
591 // Currently assuming there will never be a key conflict between
592 // providers
593
594 // annotation merging. not so efficient, should revisit later
595 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
596 }
597
598 return new DefaultDevice(primary, deviceId , type, manufacturer,
599 hwVersion, swVersion, serialNumber, annotations);
600 }
601
602 /**
603 * Returns a Port, merging description given from multiple Providers.
604 *
605 * @param device device the port is on
606 * @param number port number
607 * @param providerDescs Collection of Descriptions from multiple providers
608 * @return Port instance
609 */
610 private Port composePort(Device device, PortNumber number,
611 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
612
613 ProviderId primary = pickPrimaryPID(providerDescs);
614 DeviceDescriptions primDescs = providerDescs.get(primary);
615 // if no primary, assume not enabled
616 // TODO: revisit this default port enabled/disabled behavior
617 boolean isEnabled = false;
618 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
619
620 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
621 if (portDesc != null) {
622 isEnabled = portDesc.value().isEnabled();
623 annotations = merge(annotations, portDesc.value().annotations());
624 }
625
626 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
627 if (e.getKey().equals(primary)) {
628 continue;
629 }
630 // TODO: should keep track of Description timestamp
631 // and only merge conflicting keys when timestamp is newer
632 // Currently assuming there will never be a key conflict between
633 // providers
634
635 // annotation merging. not so efficient, should revisit later
636 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
637 if (otherPortDesc != null) {
638 annotations = merge(annotations, otherPortDesc.value().annotations());
639 }
640 }
641
642 return new DefaultPort(device, number, isEnabled, annotations);
643 }
644
645 /**
646 * @return primary ProviderID, or randomly chosen one if none exists
647 */
648 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700649 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700650 ProviderId fallBackPrimary = null;
651 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
652 if (!e.getKey().isAncillary()) {
653 return e.getKey();
654 } else if (fallBackPrimary == null) {
655 // pick randomly as a fallback in case there is no primary
656 fallBackPrimary = e.getKey();
657 }
658 }
659 return fallBackPrimary;
660 }
661
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700662 private DeviceDescriptions getPrimaryDescriptions(
663 Map<ProviderId, DeviceDescriptions> providerDescs) {
664 ProviderId pid = pickPrimaryPID(providerDescs);
665 return providerDescs.get(pid);
666 }
667
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700668 public static final class InitDeviceDescs
669 implements ConcurrentInitializer<DeviceDescriptions> {
670
671 private final Timestamped<DeviceDescription> deviceDesc;
672
673 public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
674 this.deviceDesc = checkNotNull(deviceDesc);
675 }
676 @Override
677 public DeviceDescriptions get() throws ConcurrentException {
678 return new DeviceDescriptions(deviceDesc);
679 }
680 }
681
682
683 /**
684 * Collection of Description of a Device and it's Ports given from a Provider.
685 */
686 public static class DeviceDescriptions {
687
688 private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
689 private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
690
691 public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
692 this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
693 this.portDescs = new ConcurrentHashMap<>();
694 }
695
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700696 Timestamp getLatestTimestamp() {
697 Timestamp latest = deviceDesc.get().timestamp();
698 for (Timestamped<PortDescription> desc : portDescs.values()) {
699 if (desc.timestamp().compareTo(latest) > 0) {
700 latest = desc.timestamp();
701 }
702 }
703 return latest;
704 }
705
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700706 public Timestamped<DeviceDescription> getDeviceDesc() {
707 return deviceDesc.get();
708 }
709
710 public Timestamped<PortDescription> getPortDesc(PortNumber number) {
711 return portDescs.get(number);
712 }
713
714 /**
715 * Puts DeviceDescription, merging annotations as necessary.
716 *
717 * @param newDesc new DeviceDescription
718 * @return previous DeviceDescription
719 */
720 public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
721 Timestamped<DeviceDescription> oldOne = deviceDesc.get();
722 Timestamped<DeviceDescription> newOne = newDesc;
723 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700724 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700725 newDesc.value().annotations());
726 newOne = new Timestamped<DeviceDescription>(
727 new DefaultDeviceDescription(newDesc.value(), merged),
728 newDesc.timestamp());
729 }
730 return deviceDesc.getAndSet(newOne);
731 }
732
733 /**
734 * Puts PortDescription, merging annotations as necessary.
735 *
736 * @param newDesc new PortDescription
737 * @return previous PortDescription
738 */
739 public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
740 Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
741 Timestamped<PortDescription> newOne = newDesc;
742 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700743 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700744 newDesc.value().annotations());
745 newOne = new Timestamped<PortDescription>(
746 new DefaultPortDescription(newDesc.value(), merged),
747 newDesc.timestamp());
748 }
749 return portDescs.put(newOne.value().portNumber(), newOne);
750 }
751 }
752}