blob: 2f1e50424260338604ed65ac9b9ec4475f005e27 [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
3import com.google.common.collect.FluentIterable;
4import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07005import com.google.common.collect.Maps;
6import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07007
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07008import org.apache.commons.lang3.concurrent.ConcurrentException;
9import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
10import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070017import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070018import org.onlab.onos.net.DefaultAnnotations;
19import org.onlab.onos.net.DefaultDevice;
20import org.onlab.onos.net.DefaultPort;
21import org.onlab.onos.net.Device;
22import org.onlab.onos.net.Device.Type;
23import org.onlab.onos.net.DeviceId;
24import org.onlab.onos.net.Port;
25import org.onlab.onos.net.PortNumber;
26import org.onlab.onos.net.SparseAnnotations;
27import org.onlab.onos.net.device.DefaultDeviceDescription;
28import org.onlab.onos.net.device.DefaultPortDescription;
29import org.onlab.onos.net.device.DeviceDescription;
30import org.onlab.onos.net.device.DeviceEvent;
31import org.onlab.onos.net.device.DeviceStore;
32import org.onlab.onos.net.device.DeviceStoreDelegate;
33import org.onlab.onos.net.device.PortDescription;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
36import org.onlab.onos.store.ClockService;
37import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani53e44e62014-10-07 12:39:51 -070041import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070042import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070043import org.onlab.onos.store.serializers.KryoPoolUtil;
44import org.onlab.onos.store.serializers.KryoSerializer;
45import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070046import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070047import org.slf4j.Logger;
48
Madan Jampani47c93732014-10-06 20:46:08 -070049import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070050import java.util.ArrayList;
51import java.util.Collections;
52import java.util.HashSet;
53import java.util.Iterator;
54import java.util.List;
55import java.util.Map;
56import java.util.Map.Entry;
57import java.util.Objects;
58import java.util.Set;
59import java.util.concurrent.ConcurrentHashMap;
60import java.util.concurrent.ConcurrentMap;
61import java.util.concurrent.atomic.AtomicReference;
62
63import static com.google.common.base.Preconditions.checkArgument;
64import static com.google.common.base.Preconditions.checkNotNull;
65import static com.google.common.base.Predicates.notNull;
66import static org.onlab.onos.net.device.DeviceEvent.Type.*;
67import static org.slf4j.LoggerFactory.getLogger;
68import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
69import static org.onlab.onos.net.DefaultAnnotations.merge;
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -070070import static org.onlab.onos.net.DefaultAnnotations.union;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070071import static com.google.common.base.Verify.verify;
72
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070073// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070074/**
75 * Manages inventory of infrastructure devices using gossip protocol to distribute
76 * information.
77 */
78@Component(immediate = true)
79@Service
80public class GossipDeviceStore
81 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
82 implements DeviceStore {
83
84 private final Logger log = getLogger(getClass());
85
86 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
87
88 // TODO: Check if inner Map can be replaced with plain Map
89 // innerMap is used to lock a Device, thus instance should never be replaced.
90 // collection of Description given from various providers
91 private final ConcurrentMap<DeviceId,
92 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070093 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070094
95 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070096 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
97 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
98
99 // to be updated under Device lock
100 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
101 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700102
103 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700104 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClockService clockService;
108
Madan Jampani47c93732014-10-06 20:46:08 -0700109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterCommunicationService clusterCommunicator;
111
Madan Jampani53e44e62014-10-07 12:39:51 -0700112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterService clusterService;
114
115 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
116 protected void setupKryoPool() {
117 serializerPool = KryoPool.newBuilder()
118 .register(KryoPoolUtil.API)
119 .register(InternalDeviceEvent.class)
120 .register(InternalPortEvent.class)
121 .register(InternalPortStatusEvent.class)
122 .register(Timestamped.class)
123 .register(MastershipBasedTimestamp.class)
124 .build()
125 .populate(1);
126 }
127
128 };
129
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700130 @Activate
131 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700132 clusterCommunicator.addSubscriber(
133 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
134 clusterCommunicator.addSubscriber(
135 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
136 clusterCommunicator.addSubscriber(
137 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700138 log.info("Started");
139 }
140
141 @Deactivate
142 public void deactivate() {
143 deviceDescs.clear();
144 devices.clear();
145 devicePorts.clear();
146 availableDevices.clear();
147 log.info("Stopped");
148 }
149
150 @Override
151 public int getDeviceCount() {
152 return devices.size();
153 }
154
155 @Override
156 public Iterable<Device> getDevices() {
157 return Collections.unmodifiableCollection(devices.values());
158 }
159
160 @Override
161 public Device getDevice(DeviceId deviceId) {
162 return devices.get(deviceId);
163 }
164
165 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700166 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
167 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700168 DeviceDescription deviceDescription) {
169 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
170 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
171 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
172 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700173 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
174 providerId, deviceId);
175 try {
176 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
177 } catch (IOException e) {
178 log.error("Failed to notify peers of a device update topology event or providerId: "
179 + providerId + " and deviceId: " + deviceId, e);
180 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700181 }
182 return event;
183 }
184
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700185 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
186 DeviceId deviceId,
187 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700188
189 // Collection of DeviceDescriptions for a Device
190 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700191 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700192
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700193 synchronized (providerDescs) {
194 // locking per device
195
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700196 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
197 log.debug("Ignoring outdated event: {}", deltaDesc);
198 return null;
199 }
200
201 DeviceDescriptions descs
202 = createIfAbsentUnchecked(providerDescs, providerId,
203 new InitDeviceDescs(deltaDesc));
204
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700205 final Device oldDevice = devices.get(deviceId);
206 final Device newDevice;
207
208 if (deltaDesc == descs.getDeviceDesc() ||
209 deltaDesc.isNewer(descs.getDeviceDesc())) {
210 // on new device or valid update
211 descs.putDeviceDesc(deltaDesc);
212 newDevice = composeDevice(deviceId, providerDescs);
213 } else {
214 // outdated event, ignored.
215 return null;
216 }
217 if (oldDevice == null) {
218 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700219 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700220 } else {
221 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700222 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700223 }
224 }
225 }
226
227 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700228 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700229 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700230 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700231
232 // update composed device cache
233 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
234 verify(oldDevice == null,
235 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
236 providerId, oldDevice, newDevice);
237
238 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700239 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700240 }
241
242 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
243 }
244
245 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700246 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700247 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700248 Device oldDevice,
249 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700250
251 // We allow only certain attributes to trigger update
252 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
253 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700254 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700255
256 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
257 if (!replaced) {
258 verify(replaced,
259 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
260 providerId, oldDevice, devices.get(newDevice.id())
261 , newDevice);
262 }
263 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700264 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700265 }
266 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
267 }
268
269 // Otherwise merely attempt to change availability if primary provider
270 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700271 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700272 return !added ? null :
273 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
274 }
275 return null;
276 }
277
278 @Override
279 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700280 Timestamp timestamp = clockService.getTimestamp(deviceId);
281 return markOfflineInternal(deviceId, timestamp);
282 }
283
284 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
285
286 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700287 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700288
289 // locking device
290 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700291
292 // accept off-line if given timestamp is newer than
293 // the latest Timestamp from Primary provider
294 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
295 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
296 if (timestamp.compareTo(lastTimestamp) <= 0) {
297 // outdated event ignore
298 return null;
299 }
300
301 offline.put(deviceId, timestamp);
302
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700303 Device device = devices.get(deviceId);
304 if (device == null) {
305 return null;
306 }
307 boolean removed = availableDevices.remove(deviceId);
308 if (removed) {
309 // TODO: broadcast ... DOWN only?
310 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700311 }
312 return null;
313 }
314 }
315
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700316 /**
317 * Marks the device as available if the given timestamp is not outdated,
318 * compared to the time the device has been marked offline.
319 *
320 * @param deviceId identifier of the device
321 * @param timestamp of the event triggering this change.
322 * @return true if availability change request was accepted and changed the state
323 */
324 // Guarded by deviceDescs value (=Device lock)
325 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
326 // accept on-line if given timestamp is newer than
327 // the latest offline request Timestamp
328 Timestamp offlineTimestamp = offline.get(deviceId);
329 if (offlineTimestamp == null ||
330 offlineTimestamp.compareTo(timestamp) < 0) {
331
332 offline.remove(deviceId);
333 return availableDevices.add(deviceId);
334 }
335 return false;
336 }
337
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700338 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700339 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
340 DeviceId deviceId,
341 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700342 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
343
Madan Jampani47c93732014-10-06 20:46:08 -0700344 Timestamped<List<PortDescription>> timestampedPortDescriptions =
345 new Timestamped<>(portDescriptions, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700346
Madan Jampani47c93732014-10-06 20:46:08 -0700347 List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700348 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700349 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
350 providerId, deviceId);
351 try {
352 notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
353 } catch (IOException e) {
354 log.error("Failed to notify peers of a port update topology event or providerId: "
355 + providerId + " and deviceId: " + deviceId, e);
356 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700357 }
358 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700359 }
360
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700361 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
362 DeviceId deviceId,
363 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700364
365 Device device = devices.get(deviceId);
366 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
367
368 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
369 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
370
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700371 List<DeviceEvent> events = new ArrayList<>();
372 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700373
374 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
375 log.debug("Ignoring outdated events: {}", portDescriptions);
376 return null;
377 }
378
379 DeviceDescriptions descs = descsMap.get(providerId);
380 // every provider must provide DeviceDescription.
381 checkArgument(descs != null,
382 "Device description for Device ID %s from Provider %s was not found",
383 deviceId, providerId);
384
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700385 Map<PortNumber, Port> ports = getPortMap(deviceId);
386
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700387 final Timestamp newTimestamp = portDescriptions.timestamp();
388
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700389 // Add new ports
390 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700391 for (PortDescription portDescription : portDescriptions.value()) {
392 final PortNumber number = portDescription.portNumber();
393 processed.add(number);
394
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700395 final Port oldPort = ports.get(number);
396 final Port newPort;
397
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700398
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700399 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
400 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700401 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700402 // on new port or valid update
403 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700404 descs.putPortDesc(new Timestamped<>(portDescription,
405 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700406 newPort = composePort(device, number, descsMap);
407 } else {
408 // outdated event, ignored.
409 continue;
410 }
411
412 events.add(oldPort == null ?
413 createPort(device, newPort, ports) :
414 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700415 }
416
417 events.addAll(pruneOldPorts(device, ports, processed));
418 }
419 return FluentIterable.from(events).filter(notNull()).toList();
420 }
421
422 // Creates a new port based on the port description adds it to the map and
423 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700424 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700425 private DeviceEvent createPort(Device device, Port newPort,
426 Map<PortNumber, Port> ports) {
427 ports.put(newPort.number(), newPort);
428 return new DeviceEvent(PORT_ADDED, device, newPort);
429 }
430
431 // Checks if the specified port requires update and if so, it replaces the
432 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700433 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700434 private DeviceEvent updatePort(Device device, Port oldPort,
435 Port newPort,
436 Map<PortNumber, Port> ports) {
437 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700438 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700439
440 ports.put(oldPort.number(), newPort);
441 return new DeviceEvent(PORT_UPDATED, device, newPort);
442 }
443 return null;
444 }
445
446 // Prunes the specified list of ports based on which ports are in the
447 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700448 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700449 private List<DeviceEvent> pruneOldPorts(Device device,
450 Map<PortNumber, Port> ports,
451 Set<PortNumber> processed) {
452 List<DeviceEvent> events = new ArrayList<>();
453 Iterator<PortNumber> iterator = ports.keySet().iterator();
454 while (iterator.hasNext()) {
455 PortNumber portNumber = iterator.next();
456 if (!processed.contains(portNumber)) {
457 events.add(new DeviceEvent(PORT_REMOVED, device,
458 ports.get(portNumber)));
459 iterator.remove();
460 }
461 }
462 return events;
463 }
464
465 // Gets the map of ports for the specified device; if one does not already
466 // exist, it creates and registers a new one.
467 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
468 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700469 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
470 }
471
472 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
473 DeviceId deviceId) {
474 return createIfAbsentUnchecked(deviceDescs, deviceId,
475 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700476 }
477
478 @Override
479 public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
480 PortDescription portDescription) {
481 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
482 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
483 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
484 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700485 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
486 providerId, deviceId);
487 try {
488 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
489 } catch (IOException e) {
490 log.error("Failed to notify peers of a port status update topology event or providerId: "
491 + providerId + " and deviceId: " + deviceId, e);
492 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700493 }
494 return event;
495 }
496
497 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
498 Timestamped<PortDescription> deltaDesc) {
499
500 Device device = devices.get(deviceId);
501 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
502
503 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
504 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
505
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700506 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700507
508 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
509 log.debug("Ignoring outdated event: {}", deltaDesc);
510 return null;
511 }
512
513 DeviceDescriptions descs = descsMap.get(providerId);
514 // assuming all providers must to give DeviceDescription
515 checkArgument(descs != null,
516 "Device description for Device ID %s from Provider %s was not found",
517 deviceId, providerId);
518
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700519 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
520 final PortNumber number = deltaDesc.value().portNumber();
521 final Port oldPort = ports.get(number);
522 final Port newPort;
523
524 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
525 if (existingPortDesc == null ||
526 deltaDesc == existingPortDesc ||
527 deltaDesc.isNewer(existingPortDesc)) {
528 // on new port or valid update
529 // update description
530 descs.putPortDesc(deltaDesc);
531 newPort = composePort(device, number, descsMap);
532 } else {
533 // outdated event, ignored.
534 return null;
535 }
536
537 if (oldPort == null) {
538 return createPort(device, newPort, ports);
539 } else {
540 return updatePort(device, oldPort, newPort, ports);
541 }
542 }
543 }
544
545 @Override
546 public List<Port> getPorts(DeviceId deviceId) {
547 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
548 if (ports == null) {
549 return Collections.emptyList();
550 }
551 return ImmutableList.copyOf(ports.values());
552 }
553
554 @Override
555 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
556 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
557 return ports == null ? null : ports.get(portNumber);
558 }
559
560 @Override
561 public boolean isAvailable(DeviceId deviceId) {
562 return availableDevices.contains(deviceId);
563 }
564
565 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700566 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
567 Timestamp timestamp = clockService.getTimestamp(deviceId);
568 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
569 // TODO: broadcast removal event
570 return event;
571 }
572
573 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
574 Timestamp timestamp) {
575
576 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700577 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700578 // accept removal request if given timestamp is newer than
579 // the latest Timestamp from Primary provider
580 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
581 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
582 if (timestamp.compareTo(lastTimestamp) <= 0) {
583 // outdated event ignore
584 return null;
585 }
586 removalRequest.put(deviceId, timestamp);
587
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700588 Device device = devices.remove(deviceId);
589 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700590 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
591 if (ports != null) {
592 ports.clear();
593 }
594 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700595 descs.clear();
596 return device == null ? null :
597 new DeviceEvent(DEVICE_REMOVED, device, null);
598 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700599 }
600
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700601 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
602 Timestamp removalTimestamp = removalRequest.get(deviceId);
603 if (removalTimestamp != null &&
604 removalTimestamp.compareTo(timestampToCheck) >= 0) {
605 // removalRequest is more recent
606 return true;
607 }
608 return false;
609 }
610
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700611 /**
612 * Returns a Device, merging description given from multiple Providers.
613 *
614 * @param deviceId device identifier
615 * @param providerDescs Collection of Descriptions from multiple providers
616 * @return Device instance
617 */
618 private Device composeDevice(DeviceId deviceId,
619 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
620
621 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
622
623 ProviderId primary = pickPrimaryPID(providerDescs);
624
625 DeviceDescriptions desc = providerDescs.get(primary);
626
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700627 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700628 Type type = base.type();
629 String manufacturer = base.manufacturer();
630 String hwVersion = base.hwVersion();
631 String swVersion = base.swVersion();
632 String serialNumber = base.serialNumber();
633 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
634 annotations = merge(annotations, base.annotations());
635
636 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
637 if (e.getKey().equals(primary)) {
638 continue;
639 }
640 // TODO: should keep track of Description timestamp
641 // and only merge conflicting keys when timestamp is newer
642 // Currently assuming there will never be a key conflict between
643 // providers
644
645 // annotation merging. not so efficient, should revisit later
646 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
647 }
648
649 return new DefaultDevice(primary, deviceId , type, manufacturer,
650 hwVersion, swVersion, serialNumber, annotations);
651 }
652
653 /**
654 * Returns a Port, merging description given from multiple Providers.
655 *
656 * @param device device the port is on
657 * @param number port number
658 * @param providerDescs Collection of Descriptions from multiple providers
659 * @return Port instance
660 */
661 private Port composePort(Device device, PortNumber number,
662 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
663
664 ProviderId primary = pickPrimaryPID(providerDescs);
665 DeviceDescriptions primDescs = providerDescs.get(primary);
666 // if no primary, assume not enabled
667 // TODO: revisit this default port enabled/disabled behavior
668 boolean isEnabled = false;
669 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
670
671 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
672 if (portDesc != null) {
673 isEnabled = portDesc.value().isEnabled();
674 annotations = merge(annotations, portDesc.value().annotations());
675 }
676
677 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
678 if (e.getKey().equals(primary)) {
679 continue;
680 }
681 // TODO: should keep track of Description timestamp
682 // and only merge conflicting keys when timestamp is newer
683 // Currently assuming there will never be a key conflict between
684 // providers
685
686 // annotation merging. not so efficient, should revisit later
687 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
688 if (otherPortDesc != null) {
689 annotations = merge(annotations, otherPortDesc.value().annotations());
690 }
691 }
692
693 return new DefaultPort(device, number, isEnabled, annotations);
694 }
695
696 /**
697 * @return primary ProviderID, or randomly chosen one if none exists
698 */
699 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700700 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700701 ProviderId fallBackPrimary = null;
702 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
703 if (!e.getKey().isAncillary()) {
704 return e.getKey();
705 } else if (fallBackPrimary == null) {
706 // pick randomly as a fallback in case there is no primary
707 fallBackPrimary = e.getKey();
708 }
709 }
710 return fallBackPrimary;
711 }
712
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700713 private DeviceDescriptions getPrimaryDescriptions(
714 Map<ProviderId, DeviceDescriptions> providerDescs) {
715 ProviderId pid = pickPrimaryPID(providerDescs);
716 return providerDescs.get(pid);
717 }
718
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700719 public static final class InitDeviceDescs
720 implements ConcurrentInitializer<DeviceDescriptions> {
721
722 private final Timestamped<DeviceDescription> deviceDesc;
723
724 public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
725 this.deviceDesc = checkNotNull(deviceDesc);
726 }
727 @Override
728 public DeviceDescriptions get() throws ConcurrentException {
729 return new DeviceDescriptions(deviceDesc);
730 }
731 }
732
733
734 /**
735 * Collection of Description of a Device and it's Ports given from a Provider.
736 */
737 public static class DeviceDescriptions {
738
739 private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
740 private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
741
742 public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
743 this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
744 this.portDescs = new ConcurrentHashMap<>();
745 }
746
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700747 Timestamp getLatestTimestamp() {
748 Timestamp latest = deviceDesc.get().timestamp();
749 for (Timestamped<PortDescription> desc : portDescs.values()) {
750 if (desc.timestamp().compareTo(latest) > 0) {
751 latest = desc.timestamp();
752 }
753 }
754 return latest;
755 }
756
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700757 public Timestamped<DeviceDescription> getDeviceDesc() {
758 return deviceDesc.get();
759 }
760
761 public Timestamped<PortDescription> getPortDesc(PortNumber number) {
762 return portDescs.get(number);
763 }
764
765 /**
766 * Puts DeviceDescription, merging annotations as necessary.
767 *
768 * @param newDesc new DeviceDescription
769 * @return previous DeviceDescription
770 */
771 public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
772 Timestamped<DeviceDescription> oldOne = deviceDesc.get();
773 Timestamped<DeviceDescription> newOne = newDesc;
774 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700775 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700776 newDesc.value().annotations());
777 newOne = new Timestamped<DeviceDescription>(
778 new DefaultDeviceDescription(newDesc.value(), merged),
779 newDesc.timestamp());
780 }
781 return deviceDesc.getAndSet(newOne);
782 }
783
784 /**
785 * Puts PortDescription, merging annotations as necessary.
786 *
787 * @param newDesc new PortDescription
788 * @return previous PortDescription
789 */
790 public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
791 Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
792 Timestamped<PortDescription> newOne = newDesc;
793 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700794 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700795 newDesc.value().annotations());
796 newOne = new Timestamped<PortDescription>(
797 new DefaultPortDescription(newDesc.value(), merged),
798 newDesc.timestamp());
799 }
800 return portDescs.put(newOne.value().portNumber(), newOne);
801 }
802 }
Madan Jampani47c93732014-10-06 20:46:08 -0700803
804 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700805 ClusterMessage message = new ClusterMessage(
806 clusterService.getLocalNode().id(),
807 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
808 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700809 clusterCommunicator.broadcast(message);
810 }
811
812 private void notifyPeers(InternalPortEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700813 ClusterMessage message = new ClusterMessage(
814 clusterService.getLocalNode().id(),
815 GossipDeviceStoreMessageSubjects.PORT_UPDATE,
816 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700817 clusterCommunicator.broadcast(message);
818 }
819
820 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700821 ClusterMessage message = new ClusterMessage(
822 clusterService.getLocalNode().id(),
823 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
824 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700825 clusterCommunicator.broadcast(message);
826 }
827
828 private class InternalDeviceEventListener implements ClusterMessageHandler {
829 @Override
830 public void handle(ClusterMessage message) {
831 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700832 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700833 ProviderId providerId = event.providerId();
834 DeviceId deviceId = event.deviceId();
835 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
836 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
837 }
838 }
839
840 private class InternalPortEventListener implements ClusterMessageHandler {
841 @Override
842 public void handle(ClusterMessage message) {
843
844 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700845 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700846
847 ProviderId providerId = event.providerId();
848 DeviceId deviceId = event.deviceId();
849 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
850
851 updatePortsInternal(providerId, deviceId, portDescriptions);
852 }
853 }
854
855 private class InternalPortStatusEventListener implements ClusterMessageHandler {
856 @Override
857 public void handle(ClusterMessage message) {
858
859 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700860 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700861
862 ProviderId providerId = event.providerId();
863 DeviceId deviceId = event.deviceId();
864 Timestamped<PortDescription> portDescription = event.portDescription();
865
866 updatePortStatusInternal(providerId, deviceId, portDescription);
867 }
868 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700869}