blob: da0a29281ab36eadef12c4ea52d5e5b662b062e3 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
3import com.google.common.collect.FluentIterable;
4import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07005import com.google.common.collect.Maps;
6import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07007
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07008import org.apache.commons.lang3.concurrent.ConcurrentException;
9import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
10import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070016import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070017import org.onlab.onos.net.DefaultAnnotations;
18import org.onlab.onos.net.DefaultDevice;
19import org.onlab.onos.net.DefaultPort;
20import org.onlab.onos.net.Device;
21import org.onlab.onos.net.Device.Type;
22import org.onlab.onos.net.DeviceId;
23import org.onlab.onos.net.Port;
24import org.onlab.onos.net.PortNumber;
25import org.onlab.onos.net.SparseAnnotations;
26import org.onlab.onos.net.device.DefaultDeviceDescription;
27import org.onlab.onos.net.device.DefaultPortDescription;
28import org.onlab.onos.net.device.DeviceDescription;
29import org.onlab.onos.net.device.DeviceEvent;
30import org.onlab.onos.net.device.DeviceStore;
31import org.onlab.onos.net.device.DeviceStoreDelegate;
32import org.onlab.onos.net.device.PortDescription;
33import org.onlab.onos.net.provider.ProviderId;
34import org.onlab.onos.store.AbstractStore;
35import org.onlab.onos.store.ClockService;
36import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070037import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
40import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070041import org.onlab.onos.store.common.impl.Timestamped;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070042import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070043import org.slf4j.Logger;
44
Madan Jampani47c93732014-10-06 20:46:08 -070045import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070046import java.util.ArrayList;
47import java.util.Collections;
48import java.util.HashSet;
49import java.util.Iterator;
50import java.util.List;
51import java.util.Map;
52import java.util.Map.Entry;
53import java.util.Objects;
54import java.util.Set;
55import java.util.concurrent.ConcurrentHashMap;
56import java.util.concurrent.ConcurrentMap;
57import java.util.concurrent.atomic.AtomicReference;
58
59import static com.google.common.base.Preconditions.checkArgument;
60import static com.google.common.base.Preconditions.checkNotNull;
61import static com.google.common.base.Predicates.notNull;
62import static org.onlab.onos.net.device.DeviceEvent.Type.*;
63import static org.slf4j.LoggerFactory.getLogger;
64import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
65import static org.onlab.onos.net.DefaultAnnotations.merge;
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -070066import static org.onlab.onos.net.DefaultAnnotations.union;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070067import static com.google.common.base.Verify.verify;
68
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070069// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070070/**
71 * Manages inventory of infrastructure devices using gossip protocol to distribute
72 * information.
73 */
74@Component(immediate = true)
75@Service
76public class GossipDeviceStore
77 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
78 implements DeviceStore {
79
80 private final Logger log = getLogger(getClass());
81
82 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
83
84 // TODO: Check if inner Map can be replaced with plain Map
85 // innerMap is used to lock a Device, thus instance should never be replaced.
86 // collection of Description given from various providers
87 private final ConcurrentMap<DeviceId,
88 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070089 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070090
91 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070092 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
93 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
94
95 // to be updated under Device lock
96 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
97 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070098
99 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700100 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected ClockService clockService;
104
Madan Jampani47c93732014-10-06 20:46:08 -0700105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected ClusterCommunicationService clusterCommunicator;
107
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700108 @Activate
109 public void activate() {
110 log.info("Started");
111 }
112
113 @Deactivate
114 public void deactivate() {
115 deviceDescs.clear();
116 devices.clear();
117 devicePorts.clear();
118 availableDevices.clear();
119 log.info("Stopped");
120 }
121
122 @Override
123 public int getDeviceCount() {
124 return devices.size();
125 }
126
127 @Override
128 public Iterable<Device> getDevices() {
129 return Collections.unmodifiableCollection(devices.values());
130 }
131
132 @Override
133 public Device getDevice(DeviceId deviceId) {
134 return devices.get(deviceId);
135 }
136
137 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700138 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
139 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700140 DeviceDescription deviceDescription) {
141 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
142 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
143 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
144 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700145 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
146 providerId, deviceId);
147 try {
148 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
149 } catch (IOException e) {
150 log.error("Failed to notify peers of a device update topology event or providerId: "
151 + providerId + " and deviceId: " + deviceId, e);
152 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700153 }
154 return event;
155 }
156
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700157 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
158 DeviceId deviceId,
159 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700160
161 // Collection of DeviceDescriptions for a Device
162 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700163 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700164
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700165 synchronized (providerDescs) {
166 // locking per device
167
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700168 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
169 log.debug("Ignoring outdated event: {}", deltaDesc);
170 return null;
171 }
172
173 DeviceDescriptions descs
174 = createIfAbsentUnchecked(providerDescs, providerId,
175 new InitDeviceDescs(deltaDesc));
176
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700177 final Device oldDevice = devices.get(deviceId);
178 final Device newDevice;
179
180 if (deltaDesc == descs.getDeviceDesc() ||
181 deltaDesc.isNewer(descs.getDeviceDesc())) {
182 // on new device or valid update
183 descs.putDeviceDesc(deltaDesc);
184 newDevice = composeDevice(deviceId, providerDescs);
185 } else {
186 // outdated event, ignored.
187 return null;
188 }
189 if (oldDevice == null) {
190 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700191 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700192 } else {
193 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700194 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700195 }
196 }
197 }
198
199 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700200 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700201 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700202 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700203
204 // update composed device cache
205 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
206 verify(oldDevice == null,
207 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
208 providerId, oldDevice, newDevice);
209
210 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700211 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700212 }
213
214 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
215 }
216
217 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700218 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700219 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700220 Device oldDevice,
221 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700222
223 // We allow only certain attributes to trigger update
224 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
225 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700226 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700227
228 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
229 if (!replaced) {
230 verify(replaced,
231 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
232 providerId, oldDevice, devices.get(newDevice.id())
233 , newDevice);
234 }
235 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700236 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700237 }
238 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
239 }
240
241 // Otherwise merely attempt to change availability if primary provider
242 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700243 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700244 return !added ? null :
245 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
246 }
247 return null;
248 }
249
250 @Override
251 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700252 Timestamp timestamp = clockService.getTimestamp(deviceId);
253 return markOfflineInternal(deviceId, timestamp);
254 }
255
256 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
257
258 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700259 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700260
261 // locking device
262 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700263
264 // accept off-line if given timestamp is newer than
265 // the latest Timestamp from Primary provider
266 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
267 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
268 if (timestamp.compareTo(lastTimestamp) <= 0) {
269 // outdated event ignore
270 return null;
271 }
272
273 offline.put(deviceId, timestamp);
274
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700275 Device device = devices.get(deviceId);
276 if (device == null) {
277 return null;
278 }
279 boolean removed = availableDevices.remove(deviceId);
280 if (removed) {
281 // TODO: broadcast ... DOWN only?
282 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700283 }
284 return null;
285 }
286 }
287
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700288 /**
289 * Marks the device as available if the given timestamp is not outdated,
290 * compared to the time the device has been marked offline.
291 *
292 * @param deviceId identifier of the device
293 * @param timestamp of the event triggering this change.
294 * @return true if availability change request was accepted and changed the state
295 */
296 // Guarded by deviceDescs value (=Device lock)
297 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
298 // accept on-line if given timestamp is newer than
299 // the latest offline request Timestamp
300 Timestamp offlineTimestamp = offline.get(deviceId);
301 if (offlineTimestamp == null ||
302 offlineTimestamp.compareTo(timestamp) < 0) {
303
304 offline.remove(deviceId);
305 return availableDevices.add(deviceId);
306 }
307 return false;
308 }
309
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700310 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700311 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
312 DeviceId deviceId,
313 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700314 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
315
Madan Jampani47c93732014-10-06 20:46:08 -0700316 Timestamped<List<PortDescription>> timestampedPortDescriptions =
317 new Timestamped<>(portDescriptions, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700318
Madan Jampani47c93732014-10-06 20:46:08 -0700319 List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700320 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700321 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
322 providerId, deviceId);
323 try {
324 notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
325 } catch (IOException e) {
326 log.error("Failed to notify peers of a port update topology event or providerId: "
327 + providerId + " and deviceId: " + deviceId, e);
328 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700329 }
330 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700331 }
332
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700333 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
334 DeviceId deviceId,
335 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700336
337 Device device = devices.get(deviceId);
338 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
339
340 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
341 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
342
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700343 List<DeviceEvent> events = new ArrayList<>();
344 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700345
346 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
347 log.debug("Ignoring outdated events: {}", portDescriptions);
348 return null;
349 }
350
351 DeviceDescriptions descs = descsMap.get(providerId);
352 // every provider must provide DeviceDescription.
353 checkArgument(descs != null,
354 "Device description for Device ID %s from Provider %s was not found",
355 deviceId, providerId);
356
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700357 Map<PortNumber, Port> ports = getPortMap(deviceId);
358
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700359 final Timestamp newTimestamp = portDescriptions.timestamp();
360
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700361 // Add new ports
362 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700363 for (PortDescription portDescription : portDescriptions.value()) {
364 final PortNumber number = portDescription.portNumber();
365 processed.add(number);
366
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700367 final Port oldPort = ports.get(number);
368 final Port newPort;
369
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700370
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700371 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
372 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700373 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700374 // on new port or valid update
375 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700376 descs.putPortDesc(new Timestamped<>(portDescription,
377 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700378 newPort = composePort(device, number, descsMap);
379 } else {
380 // outdated event, ignored.
381 continue;
382 }
383
384 events.add(oldPort == null ?
385 createPort(device, newPort, ports) :
386 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700387 }
388
389 events.addAll(pruneOldPorts(device, ports, processed));
390 }
391 return FluentIterable.from(events).filter(notNull()).toList();
392 }
393
394 // Creates a new port based on the port description adds it to the map and
395 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700396 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700397 private DeviceEvent createPort(Device device, Port newPort,
398 Map<PortNumber, Port> ports) {
399 ports.put(newPort.number(), newPort);
400 return new DeviceEvent(PORT_ADDED, device, newPort);
401 }
402
403 // Checks if the specified port requires update and if so, it replaces the
404 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700405 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700406 private DeviceEvent updatePort(Device device, Port oldPort,
407 Port newPort,
408 Map<PortNumber, Port> ports) {
409 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700410 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700411
412 ports.put(oldPort.number(), newPort);
413 return new DeviceEvent(PORT_UPDATED, device, newPort);
414 }
415 return null;
416 }
417
418 // Prunes the specified list of ports based on which ports are in the
419 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700420 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700421 private List<DeviceEvent> pruneOldPorts(Device device,
422 Map<PortNumber, Port> ports,
423 Set<PortNumber> processed) {
424 List<DeviceEvent> events = new ArrayList<>();
425 Iterator<PortNumber> iterator = ports.keySet().iterator();
426 while (iterator.hasNext()) {
427 PortNumber portNumber = iterator.next();
428 if (!processed.contains(portNumber)) {
429 events.add(new DeviceEvent(PORT_REMOVED, device,
430 ports.get(portNumber)));
431 iterator.remove();
432 }
433 }
434 return events;
435 }
436
437 // Gets the map of ports for the specified device; if one does not already
438 // exist, it creates and registers a new one.
439 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
440 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700441 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
442 }
443
444 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
445 DeviceId deviceId) {
446 return createIfAbsentUnchecked(deviceDescs, deviceId,
447 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700448 }
449
450 @Override
451 public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
452 PortDescription portDescription) {
453 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
454 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
455 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
456 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700457 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
458 providerId, deviceId);
459 try {
460 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
461 } catch (IOException e) {
462 log.error("Failed to notify peers of a port status update topology event or providerId: "
463 + providerId + " and deviceId: " + deviceId, e);
464 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700465 }
466 return event;
467 }
468
469 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
470 Timestamped<PortDescription> deltaDesc) {
471
472 Device device = devices.get(deviceId);
473 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
474
475 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
476 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
477
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700478 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700479
480 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
481 log.debug("Ignoring outdated event: {}", deltaDesc);
482 return null;
483 }
484
485 DeviceDescriptions descs = descsMap.get(providerId);
486 // assuming all providers must to give DeviceDescription
487 checkArgument(descs != null,
488 "Device description for Device ID %s from Provider %s was not found",
489 deviceId, providerId);
490
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700491 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
492 final PortNumber number = deltaDesc.value().portNumber();
493 final Port oldPort = ports.get(number);
494 final Port newPort;
495
496 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
497 if (existingPortDesc == null ||
498 deltaDesc == existingPortDesc ||
499 deltaDesc.isNewer(existingPortDesc)) {
500 // on new port or valid update
501 // update description
502 descs.putPortDesc(deltaDesc);
503 newPort = composePort(device, number, descsMap);
504 } else {
505 // outdated event, ignored.
506 return null;
507 }
508
509 if (oldPort == null) {
510 return createPort(device, newPort, ports);
511 } else {
512 return updatePort(device, oldPort, newPort, ports);
513 }
514 }
515 }
516
517 @Override
518 public List<Port> getPorts(DeviceId deviceId) {
519 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
520 if (ports == null) {
521 return Collections.emptyList();
522 }
523 return ImmutableList.copyOf(ports.values());
524 }
525
526 @Override
527 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
528 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
529 return ports == null ? null : ports.get(portNumber);
530 }
531
532 @Override
533 public boolean isAvailable(DeviceId deviceId) {
534 return availableDevices.contains(deviceId);
535 }
536
537 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700538 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
539 Timestamp timestamp = clockService.getTimestamp(deviceId);
540 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
541 // TODO: broadcast removal event
542 return event;
543 }
544
545 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
546 Timestamp timestamp) {
547
548 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700549 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700550 // accept removal request if given timestamp is newer than
551 // the latest Timestamp from Primary provider
552 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
553 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
554 if (timestamp.compareTo(lastTimestamp) <= 0) {
555 // outdated event ignore
556 return null;
557 }
558 removalRequest.put(deviceId, timestamp);
559
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700560 Device device = devices.remove(deviceId);
561 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700562 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
563 if (ports != null) {
564 ports.clear();
565 }
566 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700567 descs.clear();
568 return device == null ? null :
569 new DeviceEvent(DEVICE_REMOVED, device, null);
570 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700571 }
572
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700573 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
574 Timestamp removalTimestamp = removalRequest.get(deviceId);
575 if (removalTimestamp != null &&
576 removalTimestamp.compareTo(timestampToCheck) >= 0) {
577 // removalRequest is more recent
578 return true;
579 }
580 return false;
581 }
582
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700583 /**
584 * Returns a Device, merging description given from multiple Providers.
585 *
586 * @param deviceId device identifier
587 * @param providerDescs Collection of Descriptions from multiple providers
588 * @return Device instance
589 */
590 private Device composeDevice(DeviceId deviceId,
591 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
592
593 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
594
595 ProviderId primary = pickPrimaryPID(providerDescs);
596
597 DeviceDescriptions desc = providerDescs.get(primary);
598
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700599 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700600 Type type = base.type();
601 String manufacturer = base.manufacturer();
602 String hwVersion = base.hwVersion();
603 String swVersion = base.swVersion();
604 String serialNumber = base.serialNumber();
605 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
606 annotations = merge(annotations, base.annotations());
607
608 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
609 if (e.getKey().equals(primary)) {
610 continue;
611 }
612 // TODO: should keep track of Description timestamp
613 // and only merge conflicting keys when timestamp is newer
614 // Currently assuming there will never be a key conflict between
615 // providers
616
617 // annotation merging. not so efficient, should revisit later
618 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
619 }
620
621 return new DefaultDevice(primary, deviceId , type, manufacturer,
622 hwVersion, swVersion, serialNumber, annotations);
623 }
624
625 /**
626 * Returns a Port, merging description given from multiple Providers.
627 *
628 * @param device device the port is on
629 * @param number port number
630 * @param providerDescs Collection of Descriptions from multiple providers
631 * @return Port instance
632 */
633 private Port composePort(Device device, PortNumber number,
634 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
635
636 ProviderId primary = pickPrimaryPID(providerDescs);
637 DeviceDescriptions primDescs = providerDescs.get(primary);
638 // if no primary, assume not enabled
639 // TODO: revisit this default port enabled/disabled behavior
640 boolean isEnabled = false;
641 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
642
643 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
644 if (portDesc != null) {
645 isEnabled = portDesc.value().isEnabled();
646 annotations = merge(annotations, portDesc.value().annotations());
647 }
648
649 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
650 if (e.getKey().equals(primary)) {
651 continue;
652 }
653 // TODO: should keep track of Description timestamp
654 // and only merge conflicting keys when timestamp is newer
655 // Currently assuming there will never be a key conflict between
656 // providers
657
658 // annotation merging. not so efficient, should revisit later
659 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
660 if (otherPortDesc != null) {
661 annotations = merge(annotations, otherPortDesc.value().annotations());
662 }
663 }
664
665 return new DefaultPort(device, number, isEnabled, annotations);
666 }
667
668 /**
669 * @return primary ProviderID, or randomly chosen one if none exists
670 */
671 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700672 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700673 ProviderId fallBackPrimary = null;
674 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
675 if (!e.getKey().isAncillary()) {
676 return e.getKey();
677 } else if (fallBackPrimary == null) {
678 // pick randomly as a fallback in case there is no primary
679 fallBackPrimary = e.getKey();
680 }
681 }
682 return fallBackPrimary;
683 }
684
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700685 private DeviceDescriptions getPrimaryDescriptions(
686 Map<ProviderId, DeviceDescriptions> providerDescs) {
687 ProviderId pid = pickPrimaryPID(providerDescs);
688 return providerDescs.get(pid);
689 }
690
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700691 public static final class InitDeviceDescs
692 implements ConcurrentInitializer<DeviceDescriptions> {
693
694 private final Timestamped<DeviceDescription> deviceDesc;
695
696 public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
697 this.deviceDesc = checkNotNull(deviceDesc);
698 }
699 @Override
700 public DeviceDescriptions get() throws ConcurrentException {
701 return new DeviceDescriptions(deviceDesc);
702 }
703 }
704
705
706 /**
707 * Collection of Description of a Device and it's Ports given from a Provider.
708 */
709 public static class DeviceDescriptions {
710
711 private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
712 private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
713
714 public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
715 this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
716 this.portDescs = new ConcurrentHashMap<>();
717 }
718
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700719 Timestamp getLatestTimestamp() {
720 Timestamp latest = deviceDesc.get().timestamp();
721 for (Timestamped<PortDescription> desc : portDescs.values()) {
722 if (desc.timestamp().compareTo(latest) > 0) {
723 latest = desc.timestamp();
724 }
725 }
726 return latest;
727 }
728
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700729 public Timestamped<DeviceDescription> getDeviceDesc() {
730 return deviceDesc.get();
731 }
732
733 public Timestamped<PortDescription> getPortDesc(PortNumber number) {
734 return portDescs.get(number);
735 }
736
737 /**
738 * Puts DeviceDescription, merging annotations as necessary.
739 *
740 * @param newDesc new DeviceDescription
741 * @return previous DeviceDescription
742 */
743 public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
744 Timestamped<DeviceDescription> oldOne = deviceDesc.get();
745 Timestamped<DeviceDescription> newOne = newDesc;
746 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700747 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700748 newDesc.value().annotations());
749 newOne = new Timestamped<DeviceDescription>(
750 new DefaultDeviceDescription(newDesc.value(), merged),
751 newDesc.timestamp());
752 }
753 return deviceDesc.getAndSet(newOne);
754 }
755
756 /**
757 * Puts PortDescription, merging annotations as necessary.
758 *
759 * @param newDesc new PortDescription
760 * @return previous PortDescription
761 */
762 public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
763 Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
764 Timestamped<PortDescription> newOne = newDesc;
765 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700766 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700767 newDesc.value().annotations());
768 newOne = new Timestamped<PortDescription>(
769 new DefaultPortDescription(newDesc.value(), merged),
770 newDesc.timestamp());
771 }
772 return portDescs.put(newOne.value().portNumber(), newOne);
773 }
774 }
Madan Jampani47c93732014-10-06 20:46:08 -0700775
776 private void notifyPeers(InternalDeviceEvent event) throws IOException {
777 ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-device-updates"), event);
778 clusterCommunicator.broadcast(message);
779 }
780
781 private void notifyPeers(InternalPortEvent event) throws IOException {
782 ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-updates"), event);
783 clusterCommunicator.broadcast(message);
784 }
785
786 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
787 ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-status-updates"), event);
788 clusterCommunicator.broadcast(message);
789 }
790
791 private class InternalDeviceEventListener implements ClusterMessageHandler {
792 @Override
793 public void handle(ClusterMessage message) {
794 log.info("Received device update event from peer: {}", message.sender());
795 InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
796 ProviderId providerId = event.providerId();
797 DeviceId deviceId = event.deviceId();
798 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
799 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
800 }
801 }
802
803 private class InternalPortEventListener implements ClusterMessageHandler {
804 @Override
805 public void handle(ClusterMessage message) {
806
807 log.info("Received port update event from peer: {}", message.sender());
808 InternalPortEvent event = (InternalPortEvent) message.payload();
809
810 ProviderId providerId = event.providerId();
811 DeviceId deviceId = event.deviceId();
812 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
813
814 updatePortsInternal(providerId, deviceId, portDescriptions);
815 }
816 }
817
818 private class InternalPortStatusEventListener implements ClusterMessageHandler {
819 @Override
820 public void handle(ClusterMessage message) {
821
822 log.info("Received port status update event from peer: {}", message.sender());
823 InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
824
825 ProviderId providerId = event.providerId();
826 DeviceId deviceId = event.deviceId();
827 Timestamped<PortDescription> portDescription = event.portDescription();
828
829 updatePortStatusInternal(providerId, deviceId, portDescription);
830 }
831 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700832}