blob: 31df710f445adc238c7710dae2c525c83bdb4dd4 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
3import com.google.common.collect.FluentIterable;
4import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07005import com.google.common.collect.Maps;
6import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07007
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07008import org.apache.commons.lang3.concurrent.ConcurrentException;
9import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
10import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070017import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070018import org.onlab.onos.net.DefaultAnnotations;
19import org.onlab.onos.net.DefaultDevice;
20import org.onlab.onos.net.DefaultPort;
21import org.onlab.onos.net.Device;
22import org.onlab.onos.net.Device.Type;
23import org.onlab.onos.net.DeviceId;
24import org.onlab.onos.net.Port;
25import org.onlab.onos.net.PortNumber;
26import org.onlab.onos.net.SparseAnnotations;
27import org.onlab.onos.net.device.DefaultDeviceDescription;
28import org.onlab.onos.net.device.DefaultPortDescription;
29import org.onlab.onos.net.device.DeviceDescription;
30import org.onlab.onos.net.device.DeviceEvent;
31import org.onlab.onos.net.device.DeviceStore;
32import org.onlab.onos.net.device.DeviceStoreDelegate;
33import org.onlab.onos.net.device.PortDescription;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
36import org.onlab.onos.store.ClockService;
37import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani53e44e62014-10-07 12:39:51 -070041import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070042import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070043import org.onlab.onos.store.serializers.KryoPoolUtil;
44import org.onlab.onos.store.serializers.KryoSerializer;
45import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070046import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070047import org.slf4j.Logger;
48
Madan Jampani47c93732014-10-06 20:46:08 -070049import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070050import java.util.ArrayList;
51import java.util.Collections;
52import java.util.HashSet;
53import java.util.Iterator;
54import java.util.List;
55import java.util.Map;
56import java.util.Map.Entry;
57import java.util.Objects;
58import java.util.Set;
59import java.util.concurrent.ConcurrentHashMap;
60import java.util.concurrent.ConcurrentMap;
61import java.util.concurrent.atomic.AtomicReference;
62
63import static com.google.common.base.Preconditions.checkArgument;
64import static com.google.common.base.Preconditions.checkNotNull;
65import static com.google.common.base.Predicates.notNull;
66import static org.onlab.onos.net.device.DeviceEvent.Type.*;
67import static org.slf4j.LoggerFactory.getLogger;
68import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
69import static org.onlab.onos.net.DefaultAnnotations.merge;
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -070070import static org.onlab.onos.net.DefaultAnnotations.union;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070071import static com.google.common.base.Verify.verify;
72
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070073// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070074/**
75 * Manages inventory of infrastructure devices using gossip protocol to distribute
76 * information.
77 */
78@Component(immediate = true)
79@Service
80public class GossipDeviceStore
81 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
82 implements DeviceStore {
83
84 private final Logger log = getLogger(getClass());
85
86 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
87
88 // TODO: Check if inner Map can be replaced with plain Map
89 // innerMap is used to lock a Device, thus instance should never be replaced.
90 // collection of Description given from various providers
91 private final ConcurrentMap<DeviceId,
92 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070093 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070094
95 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070096 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
97 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
98
99 // to be updated under Device lock
100 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
101 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700102
103 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700104 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClockService clockService;
108
Madan Jampani47c93732014-10-06 20:46:08 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterCommunicationService clusterCommunicator;
111
Madan Jampani53e44e62014-10-07 12:39:51 -0700112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterService clusterService;
114
115 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700116 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700117 protected void setupKryoPool() {
118 serializerPool = KryoPool.newBuilder()
119 .register(KryoPoolUtil.API)
120 .register(InternalDeviceEvent.class)
121 .register(InternalPortEvent.class)
122 .register(InternalPortStatusEvent.class)
123 .register(Timestamped.class)
124 .register(MastershipBasedTimestamp.class)
125 .build()
126 .populate(1);
127 }
128
129 };
130
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700131 @Activate
132 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700133 clusterCommunicator.addSubscriber(
134 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
135 clusterCommunicator.addSubscriber(
136 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
137 clusterCommunicator.addSubscriber(
138 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700139 log.info("Started");
140 }
141
142 @Deactivate
143 public void deactivate() {
144 deviceDescs.clear();
145 devices.clear();
146 devicePorts.clear();
147 availableDevices.clear();
148 log.info("Stopped");
149 }
150
151 @Override
152 public int getDeviceCount() {
153 return devices.size();
154 }
155
156 @Override
157 public Iterable<Device> getDevices() {
158 return Collections.unmodifiableCollection(devices.values());
159 }
160
161 @Override
162 public Device getDevice(DeviceId deviceId) {
163 return devices.get(deviceId);
164 }
165
166 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700167 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
168 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700169 DeviceDescription deviceDescription) {
170 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
171 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
172 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
173 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700174 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
175 providerId, deviceId);
176 try {
177 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
178 } catch (IOException e) {
179 log.error("Failed to notify peers of a device update topology event or providerId: "
180 + providerId + " and deviceId: " + deviceId, e);
181 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700182 }
183 return event;
184 }
185
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700186 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
187 DeviceId deviceId,
188 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700189
190 // Collection of DeviceDescriptions for a Device
191 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700192 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700193
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700194 synchronized (providerDescs) {
195 // locking per device
196
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700197 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
198 log.debug("Ignoring outdated event: {}", deltaDesc);
199 return null;
200 }
201
202 DeviceDescriptions descs
203 = createIfAbsentUnchecked(providerDescs, providerId,
204 new InitDeviceDescs(deltaDesc));
205
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700206 final Device oldDevice = devices.get(deviceId);
207 final Device newDevice;
208
209 if (deltaDesc == descs.getDeviceDesc() ||
210 deltaDesc.isNewer(descs.getDeviceDesc())) {
211 // on new device or valid update
212 descs.putDeviceDesc(deltaDesc);
213 newDevice = composeDevice(deviceId, providerDescs);
214 } else {
215 // outdated event, ignored.
216 return null;
217 }
218 if (oldDevice == null) {
219 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700220 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700221 } else {
222 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700223 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700224 }
225 }
226 }
227
228 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700229 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700230 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700231 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700232
233 // update composed device cache
234 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
235 verify(oldDevice == null,
236 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
237 providerId, oldDevice, newDevice);
238
239 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700240 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700241 }
242
243 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
244 }
245
246 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700247 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700248 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700249 Device oldDevice,
250 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700251
252 // We allow only certain attributes to trigger update
253 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
254 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700255 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700256
257 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
258 if (!replaced) {
259 verify(replaced,
260 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
261 providerId, oldDevice, devices.get(newDevice.id())
262 , newDevice);
263 }
264 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700265 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700266 }
267 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
268 }
269
270 // Otherwise merely attempt to change availability if primary provider
271 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700272 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700273 return !added ? null :
274 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
275 }
276 return null;
277 }
278
279 @Override
280 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700281 Timestamp timestamp = clockService.getTimestamp(deviceId);
282 return markOfflineInternal(deviceId, timestamp);
283 }
284
285 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
286
287 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700288 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700289
290 // locking device
291 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700292
293 // accept off-line if given timestamp is newer than
294 // the latest Timestamp from Primary provider
295 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
296 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
297 if (timestamp.compareTo(lastTimestamp) <= 0) {
298 // outdated event ignore
299 return null;
300 }
301
302 offline.put(deviceId, timestamp);
303
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700304 Device device = devices.get(deviceId);
305 if (device == null) {
306 return null;
307 }
308 boolean removed = availableDevices.remove(deviceId);
309 if (removed) {
310 // TODO: broadcast ... DOWN only?
311 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700312 }
313 return null;
314 }
315 }
316
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700317 /**
318 * Marks the device as available if the given timestamp is not outdated,
319 * compared to the time the device has been marked offline.
320 *
321 * @param deviceId identifier of the device
322 * @param timestamp of the event triggering this change.
323 * @return true if availability change request was accepted and changed the state
324 */
325 // Guarded by deviceDescs value (=Device lock)
326 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
327 // accept on-line if given timestamp is newer than
328 // the latest offline request Timestamp
329 Timestamp offlineTimestamp = offline.get(deviceId);
330 if (offlineTimestamp == null ||
331 offlineTimestamp.compareTo(timestamp) < 0) {
332
333 offline.remove(deviceId);
334 return availableDevices.add(deviceId);
335 }
336 return false;
337 }
338
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700339 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700340 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
341 DeviceId deviceId,
342 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700343 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
344
Madan Jampani47c93732014-10-06 20:46:08 -0700345 Timestamped<List<PortDescription>> timestampedPortDescriptions =
346 new Timestamped<>(portDescriptions, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700347
Madan Jampani47c93732014-10-06 20:46:08 -0700348 List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700349 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700350 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
351 providerId, deviceId);
352 try {
353 notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
354 } catch (IOException e) {
355 log.error("Failed to notify peers of a port update topology event or providerId: "
356 + providerId + " and deviceId: " + deviceId, e);
357 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700358 }
359 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700360 }
361
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700362 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
363 DeviceId deviceId,
364 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700365
366 Device device = devices.get(deviceId);
367 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
368
369 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
370 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
371
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700372 List<DeviceEvent> events = new ArrayList<>();
373 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700374
375 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
376 log.debug("Ignoring outdated events: {}", portDescriptions);
377 return null;
378 }
379
380 DeviceDescriptions descs = descsMap.get(providerId);
381 // every provider must provide DeviceDescription.
382 checkArgument(descs != null,
383 "Device description for Device ID %s from Provider %s was not found",
384 deviceId, providerId);
385
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700386 Map<PortNumber, Port> ports = getPortMap(deviceId);
387
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700388 final Timestamp newTimestamp = portDescriptions.timestamp();
389
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700390 // Add new ports
391 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700392 for (PortDescription portDescription : portDescriptions.value()) {
393 final PortNumber number = portDescription.portNumber();
394 processed.add(number);
395
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700396 final Port oldPort = ports.get(number);
397 final Port newPort;
398
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700399
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700400 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
401 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700402 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700403 // on new port or valid update
404 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700405 descs.putPortDesc(new Timestamped<>(portDescription,
406 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700407 newPort = composePort(device, number, descsMap);
408 } else {
409 // outdated event, ignored.
410 continue;
411 }
412
413 events.add(oldPort == null ?
414 createPort(device, newPort, ports) :
415 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700416 }
417
418 events.addAll(pruneOldPorts(device, ports, processed));
419 }
420 return FluentIterable.from(events).filter(notNull()).toList();
421 }
422
423 // Creates a new port based on the port description adds it to the map and
424 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700425 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700426 private DeviceEvent createPort(Device device, Port newPort,
427 Map<PortNumber, Port> ports) {
428 ports.put(newPort.number(), newPort);
429 return new DeviceEvent(PORT_ADDED, device, newPort);
430 }
431
432 // Checks if the specified port requires update and if so, it replaces the
433 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700434 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700435 private DeviceEvent updatePort(Device device, Port oldPort,
436 Port newPort,
437 Map<PortNumber, Port> ports) {
438 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700439 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700440
441 ports.put(oldPort.number(), newPort);
442 return new DeviceEvent(PORT_UPDATED, device, newPort);
443 }
444 return null;
445 }
446
447 // Prunes the specified list of ports based on which ports are in the
448 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700449 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700450 private List<DeviceEvent> pruneOldPorts(Device device,
451 Map<PortNumber, Port> ports,
452 Set<PortNumber> processed) {
453 List<DeviceEvent> events = new ArrayList<>();
454 Iterator<PortNumber> iterator = ports.keySet().iterator();
455 while (iterator.hasNext()) {
456 PortNumber portNumber = iterator.next();
457 if (!processed.contains(portNumber)) {
458 events.add(new DeviceEvent(PORT_REMOVED, device,
459 ports.get(portNumber)));
460 iterator.remove();
461 }
462 }
463 return events;
464 }
465
466 // Gets the map of ports for the specified device; if one does not already
467 // exist, it creates and registers a new one.
468 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
469 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700470 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
471 }
472
473 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
474 DeviceId deviceId) {
475 return createIfAbsentUnchecked(deviceDescs, deviceId,
476 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700477 }
478
479 @Override
480 public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
481 PortDescription portDescription) {
482 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
483 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
484 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
485 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700486 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
487 providerId, deviceId);
488 try {
489 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
490 } catch (IOException e) {
491 log.error("Failed to notify peers of a port status update topology event or providerId: "
492 + providerId + " and deviceId: " + deviceId, e);
493 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700494 }
495 return event;
496 }
497
498 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
499 Timestamped<PortDescription> deltaDesc) {
500
501 Device device = devices.get(deviceId);
502 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
503
504 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
505 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
506
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700507 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700508
509 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
510 log.debug("Ignoring outdated event: {}", deltaDesc);
511 return null;
512 }
513
514 DeviceDescriptions descs = descsMap.get(providerId);
515 // assuming all providers must to give DeviceDescription
516 checkArgument(descs != null,
517 "Device description for Device ID %s from Provider %s was not found",
518 deviceId, providerId);
519
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700520 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
521 final PortNumber number = deltaDesc.value().portNumber();
522 final Port oldPort = ports.get(number);
523 final Port newPort;
524
525 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
526 if (existingPortDesc == null ||
527 deltaDesc == existingPortDesc ||
528 deltaDesc.isNewer(existingPortDesc)) {
529 // on new port or valid update
530 // update description
531 descs.putPortDesc(deltaDesc);
532 newPort = composePort(device, number, descsMap);
533 } else {
534 // outdated event, ignored.
535 return null;
536 }
537
538 if (oldPort == null) {
539 return createPort(device, newPort, ports);
540 } else {
541 return updatePort(device, oldPort, newPort, ports);
542 }
543 }
544 }
545
546 @Override
547 public List<Port> getPorts(DeviceId deviceId) {
548 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
549 if (ports == null) {
550 return Collections.emptyList();
551 }
552 return ImmutableList.copyOf(ports.values());
553 }
554
555 @Override
556 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
557 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
558 return ports == null ? null : ports.get(portNumber);
559 }
560
561 @Override
562 public boolean isAvailable(DeviceId deviceId) {
563 return availableDevices.contains(deviceId);
564 }
565
566 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700567 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
568 Timestamp timestamp = clockService.getTimestamp(deviceId);
569 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
570 // TODO: broadcast removal event
571 return event;
572 }
573
574 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
575 Timestamp timestamp) {
576
577 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700578 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700579 // accept removal request if given timestamp is newer than
580 // the latest Timestamp from Primary provider
581 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
582 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
583 if (timestamp.compareTo(lastTimestamp) <= 0) {
584 // outdated event ignore
585 return null;
586 }
587 removalRequest.put(deviceId, timestamp);
588
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700589 Device device = devices.remove(deviceId);
590 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700591 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
592 if (ports != null) {
593 ports.clear();
594 }
595 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700596 descs.clear();
597 return device == null ? null :
598 new DeviceEvent(DEVICE_REMOVED, device, null);
599 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700600 }
601
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700602 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
603 Timestamp removalTimestamp = removalRequest.get(deviceId);
604 if (removalTimestamp != null &&
605 removalTimestamp.compareTo(timestampToCheck) >= 0) {
606 // removalRequest is more recent
607 return true;
608 }
609 return false;
610 }
611
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700612 /**
613 * Returns a Device, merging description given from multiple Providers.
614 *
615 * @param deviceId device identifier
616 * @param providerDescs Collection of Descriptions from multiple providers
617 * @return Device instance
618 */
619 private Device composeDevice(DeviceId deviceId,
620 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
621
622 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
623
624 ProviderId primary = pickPrimaryPID(providerDescs);
625
626 DeviceDescriptions desc = providerDescs.get(primary);
627
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700628 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700629 Type type = base.type();
630 String manufacturer = base.manufacturer();
631 String hwVersion = base.hwVersion();
632 String swVersion = base.swVersion();
633 String serialNumber = base.serialNumber();
634 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
635 annotations = merge(annotations, base.annotations());
636
637 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
638 if (e.getKey().equals(primary)) {
639 continue;
640 }
641 // TODO: should keep track of Description timestamp
642 // and only merge conflicting keys when timestamp is newer
643 // Currently assuming there will never be a key conflict between
644 // providers
645
646 // annotation merging. not so efficient, should revisit later
647 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
648 }
649
650 return new DefaultDevice(primary, deviceId , type, manufacturer,
651 hwVersion, swVersion, serialNumber, annotations);
652 }
653
654 /**
655 * Returns a Port, merging description given from multiple Providers.
656 *
657 * @param device device the port is on
658 * @param number port number
659 * @param providerDescs Collection of Descriptions from multiple providers
660 * @return Port instance
661 */
662 private Port composePort(Device device, PortNumber number,
663 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
664
665 ProviderId primary = pickPrimaryPID(providerDescs);
666 DeviceDescriptions primDescs = providerDescs.get(primary);
667 // if no primary, assume not enabled
668 // TODO: revisit this default port enabled/disabled behavior
669 boolean isEnabled = false;
670 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
671
672 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
673 if (portDesc != null) {
674 isEnabled = portDesc.value().isEnabled();
675 annotations = merge(annotations, portDesc.value().annotations());
676 }
677
678 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
679 if (e.getKey().equals(primary)) {
680 continue;
681 }
682 // TODO: should keep track of Description timestamp
683 // and only merge conflicting keys when timestamp is newer
684 // Currently assuming there will never be a key conflict between
685 // providers
686
687 // annotation merging. not so efficient, should revisit later
688 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
689 if (otherPortDesc != null) {
690 annotations = merge(annotations, otherPortDesc.value().annotations());
691 }
692 }
693
694 return new DefaultPort(device, number, isEnabled, annotations);
695 }
696
697 /**
698 * @return primary ProviderID, or randomly chosen one if none exists
699 */
700 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700701 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700702 ProviderId fallBackPrimary = null;
703 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
704 if (!e.getKey().isAncillary()) {
705 return e.getKey();
706 } else if (fallBackPrimary == null) {
707 // pick randomly as a fallback in case there is no primary
708 fallBackPrimary = e.getKey();
709 }
710 }
711 return fallBackPrimary;
712 }
713
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700714 private DeviceDescriptions getPrimaryDescriptions(
715 Map<ProviderId, DeviceDescriptions> providerDescs) {
716 ProviderId pid = pickPrimaryPID(providerDescs);
717 return providerDescs.get(pid);
718 }
719
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700720 public static final class InitDeviceDescs
721 implements ConcurrentInitializer<DeviceDescriptions> {
722
723 private final Timestamped<DeviceDescription> deviceDesc;
724
725 public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
726 this.deviceDesc = checkNotNull(deviceDesc);
727 }
728 @Override
729 public DeviceDescriptions get() throws ConcurrentException {
730 return new DeviceDescriptions(deviceDesc);
731 }
732 }
733
734
735 /**
736 * Collection of Description of a Device and it's Ports given from a Provider.
737 */
738 public static class DeviceDescriptions {
739
740 private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
741 private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
742
743 public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
744 this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
745 this.portDescs = new ConcurrentHashMap<>();
746 }
747
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700748 Timestamp getLatestTimestamp() {
749 Timestamp latest = deviceDesc.get().timestamp();
750 for (Timestamped<PortDescription> desc : portDescs.values()) {
751 if (desc.timestamp().compareTo(latest) > 0) {
752 latest = desc.timestamp();
753 }
754 }
755 return latest;
756 }
757
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700758 public Timestamped<DeviceDescription> getDeviceDesc() {
759 return deviceDesc.get();
760 }
761
762 public Timestamped<PortDescription> getPortDesc(PortNumber number) {
763 return portDescs.get(number);
764 }
765
766 /**
767 * Puts DeviceDescription, merging annotations as necessary.
768 *
769 * @param newDesc new DeviceDescription
770 * @return previous DeviceDescription
771 */
772 public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
773 Timestamped<DeviceDescription> oldOne = deviceDesc.get();
774 Timestamped<DeviceDescription> newOne = newDesc;
775 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700776 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700777 newDesc.value().annotations());
778 newOne = new Timestamped<DeviceDescription>(
779 new DefaultDeviceDescription(newDesc.value(), merged),
780 newDesc.timestamp());
781 }
782 return deviceDesc.getAndSet(newOne);
783 }
784
785 /**
786 * Puts PortDescription, merging annotations as necessary.
787 *
788 * @param newDesc new PortDescription
789 * @return previous PortDescription
790 */
791 public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
792 Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
793 Timestamped<PortDescription> newOne = newDesc;
794 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700795 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700796 newDesc.value().annotations());
797 newOne = new Timestamped<PortDescription>(
798 new DefaultPortDescription(newDesc.value(), merged),
799 newDesc.timestamp());
800 }
801 return portDescs.put(newOne.value().portNumber(), newOne);
802 }
803 }
Madan Jampani47c93732014-10-06 20:46:08 -0700804
805 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700806 ClusterMessage message = new ClusterMessage(
807 clusterService.getLocalNode().id(),
808 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
809 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700810 clusterCommunicator.broadcast(message);
811 }
812
813 private void notifyPeers(InternalPortEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700814 ClusterMessage message = new ClusterMessage(
815 clusterService.getLocalNode().id(),
816 GossipDeviceStoreMessageSubjects.PORT_UPDATE,
817 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700818 clusterCommunicator.broadcast(message);
819 }
820
821 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700822 ClusterMessage message = new ClusterMessage(
823 clusterService.getLocalNode().id(),
824 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
825 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700826 clusterCommunicator.broadcast(message);
827 }
828
829 private class InternalDeviceEventListener implements ClusterMessageHandler {
830 @Override
831 public void handle(ClusterMessage message) {
832 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700833 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700834 ProviderId providerId = event.providerId();
835 DeviceId deviceId = event.deviceId();
836 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
837 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
838 }
839 }
840
841 private class InternalPortEventListener implements ClusterMessageHandler {
842 @Override
843 public void handle(ClusterMessage message) {
844
845 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700846 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700847
848 ProviderId providerId = event.providerId();
849 DeviceId deviceId = event.deviceId();
850 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
851
852 updatePortsInternal(providerId, deviceId, portDescriptions);
853 }
854 }
855
856 private class InternalPortStatusEventListener implements ClusterMessageHandler {
857 @Override
858 public void handle(ClusterMessage message) {
859
860 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700861 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700862
863 ProviderId providerId = event.providerId();
864 DeviceId deviceId = event.deviceId();
865 Timestamped<PortDescription> portDescription = event.portDescription();
866
867 updatePortStatusInternal(providerId, deviceId, portDescription);
868 }
869 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700870}