blob: ab9ae3c6e997abe693c735f449f799d9123792bf [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
3import com.google.common.collect.FluentIterable;
4import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07005import com.google.common.collect.Maps;
6import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07007
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07008import org.apache.commons.lang3.concurrent.ConcurrentException;
9import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
10import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070016import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070017import org.onlab.onos.net.DefaultAnnotations;
18import org.onlab.onos.net.DefaultDevice;
19import org.onlab.onos.net.DefaultPort;
20import org.onlab.onos.net.Device;
21import org.onlab.onos.net.Device.Type;
22import org.onlab.onos.net.DeviceId;
23import org.onlab.onos.net.Port;
24import org.onlab.onos.net.PortNumber;
25import org.onlab.onos.net.SparseAnnotations;
26import org.onlab.onos.net.device.DefaultDeviceDescription;
27import org.onlab.onos.net.device.DefaultPortDescription;
28import org.onlab.onos.net.device.DeviceDescription;
29import org.onlab.onos.net.device.DeviceEvent;
30import org.onlab.onos.net.device.DeviceStore;
31import org.onlab.onos.net.device.DeviceStoreDelegate;
32import org.onlab.onos.net.device.PortDescription;
33import org.onlab.onos.net.provider.ProviderId;
34import org.onlab.onos.store.AbstractStore;
35import org.onlab.onos.store.ClockService;
36import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070037import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070040import org.onlab.onos.store.common.impl.Timestamped;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070041import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070042import org.slf4j.Logger;
43
Madan Jampani47c93732014-10-06 20:46:08 -070044import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070045import java.util.ArrayList;
46import java.util.Collections;
47import java.util.HashSet;
48import java.util.Iterator;
49import java.util.List;
50import java.util.Map;
51import java.util.Map.Entry;
52import java.util.Objects;
53import java.util.Set;
54import java.util.concurrent.ConcurrentHashMap;
55import java.util.concurrent.ConcurrentMap;
56import java.util.concurrent.atomic.AtomicReference;
57
58import static com.google.common.base.Preconditions.checkArgument;
59import static com.google.common.base.Preconditions.checkNotNull;
60import static com.google.common.base.Predicates.notNull;
61import static org.onlab.onos.net.device.DeviceEvent.Type.*;
62import static org.slf4j.LoggerFactory.getLogger;
63import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
64import static org.onlab.onos.net.DefaultAnnotations.merge;
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -070065import static org.onlab.onos.net.DefaultAnnotations.union;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070066import static com.google.common.base.Verify.verify;
67
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070068// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070069/**
70 * Manages inventory of infrastructure devices using gossip protocol to distribute
71 * information.
72 */
73@Component(immediate = true)
74@Service
75public class GossipDeviceStore
76 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
77 implements DeviceStore {
78
79 private final Logger log = getLogger(getClass());
80
81 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
82
83 // TODO: Check if inner Map can be replaced with plain Map
84 // innerMap is used to lock a Device, thus instance should never be replaced.
85 // collection of Description given from various providers
86 private final ConcurrentMap<DeviceId,
87 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070088 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070089
90 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070091 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
92 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
93
94 // to be updated under Device lock
95 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
96 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070097
98 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070099 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected ClockService clockService;
103
Madan Jampani47c93732014-10-06 20:46:08 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterCommunicationService clusterCommunicator;
106
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700107 @Activate
108 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700109 clusterCommunicator.addSubscriber(
110 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
111 clusterCommunicator.addSubscriber(
112 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
113 clusterCommunicator.addSubscriber(
114 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700115 log.info("Started");
116 }
117
118 @Deactivate
119 public void deactivate() {
120 deviceDescs.clear();
121 devices.clear();
122 devicePorts.clear();
123 availableDevices.clear();
124 log.info("Stopped");
125 }
126
127 @Override
128 public int getDeviceCount() {
129 return devices.size();
130 }
131
132 @Override
133 public Iterable<Device> getDevices() {
134 return Collections.unmodifiableCollection(devices.values());
135 }
136
137 @Override
138 public Device getDevice(DeviceId deviceId) {
139 return devices.get(deviceId);
140 }
141
142 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700143 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
144 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700145 DeviceDescription deviceDescription) {
146 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
147 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
148 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
149 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700150 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
151 providerId, deviceId);
152 try {
153 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
154 } catch (IOException e) {
155 log.error("Failed to notify peers of a device update topology event or providerId: "
156 + providerId + " and deviceId: " + deviceId, e);
157 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700158 }
159 return event;
160 }
161
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700162 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
163 DeviceId deviceId,
164 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700165
166 // Collection of DeviceDescriptions for a Device
167 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700168 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700169
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700170 synchronized (providerDescs) {
171 // locking per device
172
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700173 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
174 log.debug("Ignoring outdated event: {}", deltaDesc);
175 return null;
176 }
177
178 DeviceDescriptions descs
179 = createIfAbsentUnchecked(providerDescs, providerId,
180 new InitDeviceDescs(deltaDesc));
181
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700182 final Device oldDevice = devices.get(deviceId);
183 final Device newDevice;
184
185 if (deltaDesc == descs.getDeviceDesc() ||
186 deltaDesc.isNewer(descs.getDeviceDesc())) {
187 // on new device or valid update
188 descs.putDeviceDesc(deltaDesc);
189 newDevice = composeDevice(deviceId, providerDescs);
190 } else {
191 // outdated event, ignored.
192 return null;
193 }
194 if (oldDevice == null) {
195 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700196 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700197 } else {
198 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700199 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700200 }
201 }
202 }
203
204 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700205 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700206 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700207 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700208
209 // update composed device cache
210 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
211 verify(oldDevice == null,
212 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
213 providerId, oldDevice, newDevice);
214
215 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700216 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700217 }
218
219 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
220 }
221
222 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700223 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700224 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700225 Device oldDevice,
226 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700227
228 // We allow only certain attributes to trigger update
229 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
230 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700231 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700232
233 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
234 if (!replaced) {
235 verify(replaced,
236 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
237 providerId, oldDevice, devices.get(newDevice.id())
238 , newDevice);
239 }
240 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700241 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700242 }
243 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
244 }
245
246 // Otherwise merely attempt to change availability if primary provider
247 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700248 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700249 return !added ? null :
250 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
251 }
252 return null;
253 }
254
255 @Override
256 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700257 Timestamp timestamp = clockService.getTimestamp(deviceId);
258 return markOfflineInternal(deviceId, timestamp);
259 }
260
261 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
262
263 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700264 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700265
266 // locking device
267 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700268
269 // accept off-line if given timestamp is newer than
270 // the latest Timestamp from Primary provider
271 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
272 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
273 if (timestamp.compareTo(lastTimestamp) <= 0) {
274 // outdated event ignore
275 return null;
276 }
277
278 offline.put(deviceId, timestamp);
279
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700280 Device device = devices.get(deviceId);
281 if (device == null) {
282 return null;
283 }
284 boolean removed = availableDevices.remove(deviceId);
285 if (removed) {
286 // TODO: broadcast ... DOWN only?
287 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700288 }
289 return null;
290 }
291 }
292
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700293 /**
294 * Marks the device as available if the given timestamp is not outdated,
295 * compared to the time the device has been marked offline.
296 *
297 * @param deviceId identifier of the device
298 * @param timestamp of the event triggering this change.
299 * @return true if availability change request was accepted and changed the state
300 */
301 // Guarded by deviceDescs value (=Device lock)
302 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
303 // accept on-line if given timestamp is newer than
304 // the latest offline request Timestamp
305 Timestamp offlineTimestamp = offline.get(deviceId);
306 if (offlineTimestamp == null ||
307 offlineTimestamp.compareTo(timestamp) < 0) {
308
309 offline.remove(deviceId);
310 return availableDevices.add(deviceId);
311 }
312 return false;
313 }
314
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700315 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700316 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
317 DeviceId deviceId,
318 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700319 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
320
Madan Jampani47c93732014-10-06 20:46:08 -0700321 Timestamped<List<PortDescription>> timestampedPortDescriptions =
322 new Timestamped<>(portDescriptions, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700323
Madan Jampani47c93732014-10-06 20:46:08 -0700324 List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700325 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700326 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
327 providerId, deviceId);
328 try {
329 notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
330 } catch (IOException e) {
331 log.error("Failed to notify peers of a port update topology event or providerId: "
332 + providerId + " and deviceId: " + deviceId, e);
333 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700334 }
335 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700336 }
337
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700338 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
339 DeviceId deviceId,
340 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700341
342 Device device = devices.get(deviceId);
343 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
344
345 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
346 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
347
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700348 List<DeviceEvent> events = new ArrayList<>();
349 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700350
351 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
352 log.debug("Ignoring outdated events: {}", portDescriptions);
353 return null;
354 }
355
356 DeviceDescriptions descs = descsMap.get(providerId);
357 // every provider must provide DeviceDescription.
358 checkArgument(descs != null,
359 "Device description for Device ID %s from Provider %s was not found",
360 deviceId, providerId);
361
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700362 Map<PortNumber, Port> ports = getPortMap(deviceId);
363
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700364 final Timestamp newTimestamp = portDescriptions.timestamp();
365
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700366 // Add new ports
367 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700368 for (PortDescription portDescription : portDescriptions.value()) {
369 final PortNumber number = portDescription.portNumber();
370 processed.add(number);
371
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700372 final Port oldPort = ports.get(number);
373 final Port newPort;
374
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700375
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700376 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
377 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700378 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700379 // on new port or valid update
380 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700381 descs.putPortDesc(new Timestamped<>(portDescription,
382 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700383 newPort = composePort(device, number, descsMap);
384 } else {
385 // outdated event, ignored.
386 continue;
387 }
388
389 events.add(oldPort == null ?
390 createPort(device, newPort, ports) :
391 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700392 }
393
394 events.addAll(pruneOldPorts(device, ports, processed));
395 }
396 return FluentIterable.from(events).filter(notNull()).toList();
397 }
398
399 // Creates a new port based on the port description adds it to the map and
400 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700401 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700402 private DeviceEvent createPort(Device device, Port newPort,
403 Map<PortNumber, Port> ports) {
404 ports.put(newPort.number(), newPort);
405 return new DeviceEvent(PORT_ADDED, device, newPort);
406 }
407
408 // Checks if the specified port requires update and if so, it replaces the
409 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700410 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700411 private DeviceEvent updatePort(Device device, Port oldPort,
412 Port newPort,
413 Map<PortNumber, Port> ports) {
414 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700415 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700416
417 ports.put(oldPort.number(), newPort);
418 return new DeviceEvent(PORT_UPDATED, device, newPort);
419 }
420 return null;
421 }
422
423 // Prunes the specified list of ports based on which ports are in the
424 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700425 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700426 private List<DeviceEvent> pruneOldPorts(Device device,
427 Map<PortNumber, Port> ports,
428 Set<PortNumber> processed) {
429 List<DeviceEvent> events = new ArrayList<>();
430 Iterator<PortNumber> iterator = ports.keySet().iterator();
431 while (iterator.hasNext()) {
432 PortNumber portNumber = iterator.next();
433 if (!processed.contains(portNumber)) {
434 events.add(new DeviceEvent(PORT_REMOVED, device,
435 ports.get(portNumber)));
436 iterator.remove();
437 }
438 }
439 return events;
440 }
441
442 // Gets the map of ports for the specified device; if one does not already
443 // exist, it creates and registers a new one.
444 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
445 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700446 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
447 }
448
449 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
450 DeviceId deviceId) {
451 return createIfAbsentUnchecked(deviceDescs, deviceId,
452 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700453 }
454
455 @Override
456 public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
457 PortDescription portDescription) {
458 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
459 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
460 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
461 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700462 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
463 providerId, deviceId);
464 try {
465 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
466 } catch (IOException e) {
467 log.error("Failed to notify peers of a port status update topology event or providerId: "
468 + providerId + " and deviceId: " + deviceId, e);
469 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700470 }
471 return event;
472 }
473
474 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
475 Timestamped<PortDescription> deltaDesc) {
476
477 Device device = devices.get(deviceId);
478 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
479
480 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
481 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
482
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700483 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700484
485 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
486 log.debug("Ignoring outdated event: {}", deltaDesc);
487 return null;
488 }
489
490 DeviceDescriptions descs = descsMap.get(providerId);
491 // assuming all providers must to give DeviceDescription
492 checkArgument(descs != null,
493 "Device description for Device ID %s from Provider %s was not found",
494 deviceId, providerId);
495
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700496 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
497 final PortNumber number = deltaDesc.value().portNumber();
498 final Port oldPort = ports.get(number);
499 final Port newPort;
500
501 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
502 if (existingPortDesc == null ||
503 deltaDesc == existingPortDesc ||
504 deltaDesc.isNewer(existingPortDesc)) {
505 // on new port or valid update
506 // update description
507 descs.putPortDesc(deltaDesc);
508 newPort = composePort(device, number, descsMap);
509 } else {
510 // outdated event, ignored.
511 return null;
512 }
513
514 if (oldPort == null) {
515 return createPort(device, newPort, ports);
516 } else {
517 return updatePort(device, oldPort, newPort, ports);
518 }
519 }
520 }
521
522 @Override
523 public List<Port> getPorts(DeviceId deviceId) {
524 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
525 if (ports == null) {
526 return Collections.emptyList();
527 }
528 return ImmutableList.copyOf(ports.values());
529 }
530
531 @Override
532 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
533 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
534 return ports == null ? null : ports.get(portNumber);
535 }
536
537 @Override
538 public boolean isAvailable(DeviceId deviceId) {
539 return availableDevices.contains(deviceId);
540 }
541
542 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700543 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
544 Timestamp timestamp = clockService.getTimestamp(deviceId);
545 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
546 // TODO: broadcast removal event
547 return event;
548 }
549
550 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
551 Timestamp timestamp) {
552
553 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700554 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700555 // accept removal request if given timestamp is newer than
556 // the latest Timestamp from Primary provider
557 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
558 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
559 if (timestamp.compareTo(lastTimestamp) <= 0) {
560 // outdated event ignore
561 return null;
562 }
563 removalRequest.put(deviceId, timestamp);
564
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700565 Device device = devices.remove(deviceId);
566 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700567 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
568 if (ports != null) {
569 ports.clear();
570 }
571 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700572 descs.clear();
573 return device == null ? null :
574 new DeviceEvent(DEVICE_REMOVED, device, null);
575 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700576 }
577
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700578 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
579 Timestamp removalTimestamp = removalRequest.get(deviceId);
580 if (removalTimestamp != null &&
581 removalTimestamp.compareTo(timestampToCheck) >= 0) {
582 // removalRequest is more recent
583 return true;
584 }
585 return false;
586 }
587
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700588 /**
589 * Returns a Device, merging description given from multiple Providers.
590 *
591 * @param deviceId device identifier
592 * @param providerDescs Collection of Descriptions from multiple providers
593 * @return Device instance
594 */
595 private Device composeDevice(DeviceId deviceId,
596 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
597
598 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
599
600 ProviderId primary = pickPrimaryPID(providerDescs);
601
602 DeviceDescriptions desc = providerDescs.get(primary);
603
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700604 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700605 Type type = base.type();
606 String manufacturer = base.manufacturer();
607 String hwVersion = base.hwVersion();
608 String swVersion = base.swVersion();
609 String serialNumber = base.serialNumber();
610 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
611 annotations = merge(annotations, base.annotations());
612
613 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
614 if (e.getKey().equals(primary)) {
615 continue;
616 }
617 // TODO: should keep track of Description timestamp
618 // and only merge conflicting keys when timestamp is newer
619 // Currently assuming there will never be a key conflict between
620 // providers
621
622 // annotation merging. not so efficient, should revisit later
623 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
624 }
625
626 return new DefaultDevice(primary, deviceId , type, manufacturer,
627 hwVersion, swVersion, serialNumber, annotations);
628 }
629
630 /**
631 * Returns a Port, merging description given from multiple Providers.
632 *
633 * @param device device the port is on
634 * @param number port number
635 * @param providerDescs Collection of Descriptions from multiple providers
636 * @return Port instance
637 */
638 private Port composePort(Device device, PortNumber number,
639 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
640
641 ProviderId primary = pickPrimaryPID(providerDescs);
642 DeviceDescriptions primDescs = providerDescs.get(primary);
643 // if no primary, assume not enabled
644 // TODO: revisit this default port enabled/disabled behavior
645 boolean isEnabled = false;
646 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
647
648 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
649 if (portDesc != null) {
650 isEnabled = portDesc.value().isEnabled();
651 annotations = merge(annotations, portDesc.value().annotations());
652 }
653
654 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
655 if (e.getKey().equals(primary)) {
656 continue;
657 }
658 // TODO: should keep track of Description timestamp
659 // and only merge conflicting keys when timestamp is newer
660 // Currently assuming there will never be a key conflict between
661 // providers
662
663 // annotation merging. not so efficient, should revisit later
664 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
665 if (otherPortDesc != null) {
666 annotations = merge(annotations, otherPortDesc.value().annotations());
667 }
668 }
669
670 return new DefaultPort(device, number, isEnabled, annotations);
671 }
672
673 /**
674 * @return primary ProviderID, or randomly chosen one if none exists
675 */
676 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700677 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700678 ProviderId fallBackPrimary = null;
679 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
680 if (!e.getKey().isAncillary()) {
681 return e.getKey();
682 } else if (fallBackPrimary == null) {
683 // pick randomly as a fallback in case there is no primary
684 fallBackPrimary = e.getKey();
685 }
686 }
687 return fallBackPrimary;
688 }
689
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700690 private DeviceDescriptions getPrimaryDescriptions(
691 Map<ProviderId, DeviceDescriptions> providerDescs) {
692 ProviderId pid = pickPrimaryPID(providerDescs);
693 return providerDescs.get(pid);
694 }
695
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700696 public static final class InitDeviceDescs
697 implements ConcurrentInitializer<DeviceDescriptions> {
698
699 private final Timestamped<DeviceDescription> deviceDesc;
700
701 public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
702 this.deviceDesc = checkNotNull(deviceDesc);
703 }
704 @Override
705 public DeviceDescriptions get() throws ConcurrentException {
706 return new DeviceDescriptions(deviceDesc);
707 }
708 }
709
710
711 /**
712 * Collection of Description of a Device and it's Ports given from a Provider.
713 */
714 public static class DeviceDescriptions {
715
716 private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
717 private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
718
719 public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
720 this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
721 this.portDescs = new ConcurrentHashMap<>();
722 }
723
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700724 Timestamp getLatestTimestamp() {
725 Timestamp latest = deviceDesc.get().timestamp();
726 for (Timestamped<PortDescription> desc : portDescs.values()) {
727 if (desc.timestamp().compareTo(latest) > 0) {
728 latest = desc.timestamp();
729 }
730 }
731 return latest;
732 }
733
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700734 public Timestamped<DeviceDescription> getDeviceDesc() {
735 return deviceDesc.get();
736 }
737
738 public Timestamped<PortDescription> getPortDesc(PortNumber number) {
739 return portDescs.get(number);
740 }
741
742 /**
743 * Puts DeviceDescription, merging annotations as necessary.
744 *
745 * @param newDesc new DeviceDescription
746 * @return previous DeviceDescription
747 */
748 public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
749 Timestamped<DeviceDescription> oldOne = deviceDesc.get();
750 Timestamped<DeviceDescription> newOne = newDesc;
751 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700752 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700753 newDesc.value().annotations());
754 newOne = new Timestamped<DeviceDescription>(
755 new DefaultDeviceDescription(newDesc.value(), merged),
756 newDesc.timestamp());
757 }
758 return deviceDesc.getAndSet(newOne);
759 }
760
761 /**
762 * Puts PortDescription, merging annotations as necessary.
763 *
764 * @param newDesc new PortDescription
765 * @return previous PortDescription
766 */
767 public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
768 Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
769 Timestamped<PortDescription> newOne = newDesc;
770 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700771 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700772 newDesc.value().annotations());
773 newOne = new Timestamped<PortDescription>(
774 new DefaultPortDescription(newDesc.value(), merged),
775 newDesc.timestamp());
776 }
777 return portDescs.put(newOne.value().portNumber(), newOne);
778 }
779 }
Madan Jampani47c93732014-10-06 20:46:08 -0700780
781 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Madan Jampani2206e012014-10-06 21:04:20 -0700782 ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700783 clusterCommunicator.broadcast(message);
784 }
785
786 private void notifyPeers(InternalPortEvent event) throws IOException {
Madan Jampani2206e012014-10-06 21:04:20 -0700787 ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700788 clusterCommunicator.broadcast(message);
789 }
790
791 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Madan Jampani2206e012014-10-06 21:04:20 -0700792 ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
Madan Jampani47c93732014-10-06 20:46:08 -0700793 clusterCommunicator.broadcast(message);
794 }
795
796 private class InternalDeviceEventListener implements ClusterMessageHandler {
797 @Override
798 public void handle(ClusterMessage message) {
799 log.info("Received device update event from peer: {}", message.sender());
800 InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
801 ProviderId providerId = event.providerId();
802 DeviceId deviceId = event.deviceId();
803 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
804 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
805 }
806 }
807
808 private class InternalPortEventListener implements ClusterMessageHandler {
809 @Override
810 public void handle(ClusterMessage message) {
811
812 log.info("Received port update event from peer: {}", message.sender());
813 InternalPortEvent event = (InternalPortEvent) message.payload();
814
815 ProviderId providerId = event.providerId();
816 DeviceId deviceId = event.deviceId();
817 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
818
819 updatePortsInternal(providerId, deviceId, portDescriptions);
820 }
821 }
822
823 private class InternalPortStatusEventListener implements ClusterMessageHandler {
824 @Override
825 public void handle(ClusterMessage message) {
826
827 log.info("Received port status update event from peer: {}", message.sender());
828 InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
829
830 ProviderId providerId = event.providerId();
831 DeviceId deviceId = event.deviceId();
832 Timestamped<PortDescription> portDescription = event.portDescription();
833
834 updatePortStatusInternal(providerId, deviceId, portDescription);
835 }
836 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700837}