blob: ac726c2d126b3a68db992fee0981c9d99e13595d [file] [log] [blame]
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07001package org.onlab.onos.store.device.impl;
2
3import com.google.common.collect.FluentIterable;
4import com.google.common.collect.ImmutableList;
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -07005import com.google.common.collect.Maps;
6import com.google.common.collect.Sets;
Madan Jampani47c93732014-10-06 20:46:08 -07007
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -07008import org.apache.commons.lang3.concurrent.ConcurrentException;
9import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
10import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
15import org.apache.felix.scr.annotations.Service;
Madan Jampani53e44e62014-10-07 12:39:51 -070016import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -070017import org.onlab.onos.net.AnnotationsUtil;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070018import org.onlab.onos.net.DefaultAnnotations;
19import org.onlab.onos.net.DefaultDevice;
20import org.onlab.onos.net.DefaultPort;
21import org.onlab.onos.net.Device;
22import org.onlab.onos.net.Device.Type;
23import org.onlab.onos.net.DeviceId;
24import org.onlab.onos.net.Port;
25import org.onlab.onos.net.PortNumber;
26import org.onlab.onos.net.SparseAnnotations;
27import org.onlab.onos.net.device.DefaultDeviceDescription;
28import org.onlab.onos.net.device.DefaultPortDescription;
29import org.onlab.onos.net.device.DeviceDescription;
30import org.onlab.onos.net.device.DeviceEvent;
31import org.onlab.onos.net.device.DeviceStore;
32import org.onlab.onos.net.device.DeviceStoreDelegate;
33import org.onlab.onos.net.device.PortDescription;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
36import org.onlab.onos.store.ClockService;
37import org.onlab.onos.store.Timestamp;
Madan Jampani47c93732014-10-06 20:46:08 -070038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani53e44e62014-10-07 12:39:51 -070041import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070042import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani53e44e62014-10-07 12:39:51 -070043import org.onlab.onos.store.serializers.KryoPoolUtil;
44import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampanifef9b3a2014-10-07 18:38:17 -070045import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070046import org.onlab.util.KryoPool;
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -070047import org.onlab.util.NewConcurrentHashMap;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070048import org.slf4j.Logger;
49
Madan Jampani47c93732014-10-06 20:46:08 -070050import java.io.IOException;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070051import java.util.ArrayList;
52import java.util.Collections;
53import java.util.HashSet;
54import java.util.Iterator;
55import java.util.List;
56import java.util.Map;
57import java.util.Map.Entry;
58import java.util.Objects;
59import java.util.Set;
60import java.util.concurrent.ConcurrentHashMap;
61import java.util.concurrent.ConcurrentMap;
62import java.util.concurrent.atomic.AtomicReference;
63
64import static com.google.common.base.Preconditions.checkArgument;
65import static com.google.common.base.Preconditions.checkNotNull;
66import static com.google.common.base.Predicates.notNull;
67import static org.onlab.onos.net.device.DeviceEvent.Type.*;
68import static org.slf4j.LoggerFactory.getLogger;
69import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
70import static org.onlab.onos.net.DefaultAnnotations.merge;
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -070071import static org.onlab.onos.net.DefaultAnnotations.union;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070072import static com.google.common.base.Verify.verify;
73
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070074// TODO: give me a better name
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070075/**
76 * Manages inventory of infrastructure devices using gossip protocol to distribute
77 * information.
78 */
79@Component(immediate = true)
80@Service
81public class GossipDeviceStore
82 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
83 implements DeviceStore {
84
85 private final Logger log = getLogger(getClass());
86
87 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
88
89 // TODO: Check if inner Map can be replaced with plain Map
90 // innerMap is used to lock a Device, thus instance should never be replaced.
91 // collection of Description given from various providers
92 private final ConcurrentMap<DeviceId,
93 ConcurrentMap<ProviderId, DeviceDescriptions>>
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070094 deviceDescs = Maps.newConcurrentMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -070095
96 // cache of Device and Ports generated by compositing descriptions from providers
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -070097 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
98 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
99
100 // to be updated under Device lock
101 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
102 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700103
104 // available(=UP) devices
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700105 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected ClockService clockService;
109
Madan Jampani47c93732014-10-06 20:46:08 -0700110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected ClusterCommunicationService clusterCommunicator;
112
Madan Jampani53e44e62014-10-07 12:39:51 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterService clusterService;
115
116 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI24b2e2a2014-10-07 15:53:57 -0700117 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -0700118 protected void setupKryoPool() {
119 serializerPool = KryoPool.newBuilder()
120 .register(KryoPoolUtil.API)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
122 .register(InternalPortEvent.class, new InternalPortEventSerializer())
123 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -0700124 .register(Timestamped.class)
Madan Jampanifef9b3a2014-10-07 18:38:17 -0700125 .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -0700126 .build()
127 .populate(1);
128 }
129
130 };
131
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700132 @Activate
133 public void activate() {
Madan Jampani2206e012014-10-06 21:04:20 -0700134 clusterCommunicator.addSubscriber(
135 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
136 clusterCommunicator.addSubscriber(
137 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
138 clusterCommunicator.addSubscriber(
139 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700140 log.info("Started");
141 }
142
143 @Deactivate
144 public void deactivate() {
145 deviceDescs.clear();
146 devices.clear();
147 devicePorts.clear();
148 availableDevices.clear();
149 log.info("Stopped");
150 }
151
152 @Override
153 public int getDeviceCount() {
154 return devices.size();
155 }
156
157 @Override
158 public Iterable<Device> getDevices() {
159 return Collections.unmodifiableCollection(devices.values());
160 }
161
162 @Override
163 public Device getDevice(DeviceId deviceId) {
164 return devices.get(deviceId);
165 }
166
167 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700168 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
169 DeviceId deviceId,
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700170 DeviceDescription deviceDescription) {
171 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
172 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
173 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
174 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700175 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
176 providerId, deviceId);
177 try {
178 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
179 } catch (IOException e) {
180 log.error("Failed to notify peers of a device update topology event or providerId: "
181 + providerId + " and deviceId: " + deviceId, e);
182 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700183 }
184 return event;
185 }
186
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700187 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
188 DeviceId deviceId,
189 Timestamped<DeviceDescription> deltaDesc) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700190
191 // Collection of DeviceDescriptions for a Device
192 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700193 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700194
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700195 synchronized (providerDescs) {
196 // locking per device
197
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700198 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
199 log.debug("Ignoring outdated event: {}", deltaDesc);
200 return null;
201 }
202
203 DeviceDescriptions descs
204 = createIfAbsentUnchecked(providerDescs, providerId,
205 new InitDeviceDescs(deltaDesc));
206
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700207 final Device oldDevice = devices.get(deviceId);
208 final Device newDevice;
209
210 if (deltaDesc == descs.getDeviceDesc() ||
211 deltaDesc.isNewer(descs.getDeviceDesc())) {
212 // on new device or valid update
213 descs.putDeviceDesc(deltaDesc);
214 newDevice = composeDevice(deviceId, providerDescs);
215 } else {
216 // outdated event, ignored.
217 return null;
218 }
219 if (oldDevice == null) {
220 // ADD
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700221 return createDevice(providerId, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700222 } else {
223 // UPDATE or ignore (no change or stale)
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700224 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700225 }
226 }
227 }
228
229 // Creates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700230 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700231 private DeviceEvent createDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700232 Device newDevice, Timestamp timestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700233
234 // update composed device cache
235 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
236 verify(oldDevice == null,
237 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
238 providerId, oldDevice, newDevice);
239
240 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700241 markOnline(newDevice.id(), timestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700242 }
243
244 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
245 }
246
247 // Updates the device and returns the appropriate event if necessary.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700248 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700249 private DeviceEvent updateDevice(ProviderId providerId,
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700250 Device oldDevice,
251 Device newDevice, Timestamp newTimestamp) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700252
253 // We allow only certain attributes to trigger update
254 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
255 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700256 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700257
258 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
259 if (!replaced) {
260 verify(replaced,
261 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
262 providerId, oldDevice, devices.get(newDevice.id())
263 , newDevice);
264 }
265 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700266 markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700267 }
268 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
269 }
270
271 // Otherwise merely attempt to change availability if primary provider
272 if (!providerId.isAncillary()) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700273 boolean added = markOnline(newDevice.id(), newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700274 return !added ? null :
275 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
276 }
277 return null;
278 }
279
280 @Override
281 public DeviceEvent markOffline(DeviceId deviceId) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700282 Timestamp timestamp = clockService.getTimestamp(deviceId);
283 return markOfflineInternal(deviceId, timestamp);
284 }
285
286 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
287
288 Map<ProviderId, DeviceDescriptions> providerDescs
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700289 = getDeviceDescriptions(deviceId);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700290
291 // locking device
292 synchronized (providerDescs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700293
294 // accept off-line if given timestamp is newer than
295 // the latest Timestamp from Primary provider
296 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
297 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
298 if (timestamp.compareTo(lastTimestamp) <= 0) {
299 // outdated event ignore
300 return null;
301 }
302
303 offline.put(deviceId, timestamp);
304
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700305 Device device = devices.get(deviceId);
306 if (device == null) {
307 return null;
308 }
309 boolean removed = availableDevices.remove(deviceId);
310 if (removed) {
311 // TODO: broadcast ... DOWN only?
312 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700313 }
314 return null;
315 }
316 }
317
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700318 /**
319 * Marks the device as available if the given timestamp is not outdated,
320 * compared to the time the device has been marked offline.
321 *
322 * @param deviceId identifier of the device
323 * @param timestamp of the event triggering this change.
324 * @return true if availability change request was accepted and changed the state
325 */
326 // Guarded by deviceDescs value (=Device lock)
327 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
328 // accept on-line if given timestamp is newer than
329 // the latest offline request Timestamp
330 Timestamp offlineTimestamp = offline.get(deviceId);
331 if (offlineTimestamp == null ||
332 offlineTimestamp.compareTo(timestamp) < 0) {
333
334 offline.remove(deviceId);
335 return availableDevices.add(deviceId);
336 }
337 return false;
338 }
339
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700340 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700341 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
342 DeviceId deviceId,
343 List<PortDescription> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700344 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
345
Madan Jampani47c93732014-10-06 20:46:08 -0700346 Timestamped<List<PortDescription>> timestampedPortDescriptions =
347 new Timestamped<>(portDescriptions, newTimestamp);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700348
Madan Jampani47c93732014-10-06 20:46:08 -0700349 List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700350 if (!events.isEmpty()) {
Madan Jampani47c93732014-10-06 20:46:08 -0700351 log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
352 providerId, deviceId);
353 try {
354 notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
355 } catch (IOException e) {
356 log.error("Failed to notify peers of a port update topology event or providerId: "
357 + providerId + " and deviceId: " + deviceId, e);
358 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700359 }
360 return events;
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700361 }
362
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700363 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
364 DeviceId deviceId,
365 Timestamped<List<PortDescription>> portDescriptions) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700366
367 Device device = devices.get(deviceId);
368 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
369
370 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
371 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
372
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700373 List<DeviceEvent> events = new ArrayList<>();
374 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700375
376 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
377 log.debug("Ignoring outdated events: {}", portDescriptions);
378 return null;
379 }
380
381 DeviceDescriptions descs = descsMap.get(providerId);
382 // every provider must provide DeviceDescription.
383 checkArgument(descs != null,
384 "Device description for Device ID %s from Provider %s was not found",
385 deviceId, providerId);
386
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700387 Map<PortNumber, Port> ports = getPortMap(deviceId);
388
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700389 final Timestamp newTimestamp = portDescriptions.timestamp();
390
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700391 // Add new ports
392 Set<PortNumber> processed = new HashSet<>();
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700393 for (PortDescription portDescription : portDescriptions.value()) {
394 final PortNumber number = portDescription.portNumber();
395 processed.add(number);
396
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700397 final Port oldPort = ports.get(number);
398 final Port newPort;
399
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700400
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700401 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
402 if (existingPortDesc == null ||
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700403 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700404 // on new port or valid update
405 // update description
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700406 descs.putPortDesc(new Timestamped<>(portDescription,
407 portDescriptions.timestamp()));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700408 newPort = composePort(device, number, descsMap);
409 } else {
410 // outdated event, ignored.
411 continue;
412 }
413
414 events.add(oldPort == null ?
415 createPort(device, newPort, ports) :
416 updatePort(device, oldPort, newPort, ports));
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700417 }
418
419 events.addAll(pruneOldPorts(device, ports, processed));
420 }
421 return FluentIterable.from(events).filter(notNull()).toList();
422 }
423
424 // Creates a new port based on the port description adds it to the map and
425 // Returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700426 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700427 private DeviceEvent createPort(Device device, Port newPort,
428 Map<PortNumber, Port> ports) {
429 ports.put(newPort.number(), newPort);
430 return new DeviceEvent(PORT_ADDED, device, newPort);
431 }
432
433 // Checks if the specified port requires update and if so, it replaces the
434 // existing entry in the map and returns corresponding event.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700435 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700436 private DeviceEvent updatePort(Device device, Port oldPort,
437 Port newPort,
438 Map<PortNumber, Port> ports) {
439 if (oldPort.isEnabled() != newPort.isEnabled() ||
Yuta HIGUCHI39ede6a2014-10-03 15:23:33 -0700440 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700441
442 ports.put(oldPort.number(), newPort);
443 return new DeviceEvent(PORT_UPDATED, device, newPort);
444 }
445 return null;
446 }
447
448 // Prunes the specified list of ports based on which ports are in the
449 // processed list and returns list of corresponding events.
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700450 // Guarded by deviceDescs value (=Device lock)
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700451 private List<DeviceEvent> pruneOldPorts(Device device,
452 Map<PortNumber, Port> ports,
453 Set<PortNumber> processed) {
454 List<DeviceEvent> events = new ArrayList<>();
455 Iterator<PortNumber> iterator = ports.keySet().iterator();
456 while (iterator.hasNext()) {
457 PortNumber portNumber = iterator.next();
458 if (!processed.contains(portNumber)) {
459 events.add(new DeviceEvent(PORT_REMOVED, device,
460 ports.get(portNumber)));
461 iterator.remove();
462 }
463 }
464 return events;
465 }
466
467 // Gets the map of ports for the specified device; if one does not already
468 // exist, it creates and registers a new one.
469 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
470 return createIfAbsentUnchecked(devicePorts, deviceId,
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700471 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
472 }
473
474 private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
475 DeviceId deviceId) {
476 return createIfAbsentUnchecked(deviceDescs, deviceId,
477 NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700478 }
479
480 @Override
481 public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
482 PortDescription portDescription) {
483 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
484 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
485 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
486 if (event != null) {
Madan Jampani47c93732014-10-06 20:46:08 -0700487 log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
488 providerId, deviceId);
489 try {
490 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
491 } catch (IOException e) {
492 log.error("Failed to notify peers of a port status update topology event or providerId: "
493 + providerId + " and deviceId: " + deviceId, e);
494 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700495 }
496 return event;
497 }
498
499 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
500 Timestamped<PortDescription> deltaDesc) {
501
502 Device device = devices.get(deviceId);
503 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
504
505 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
506 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
507
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700508 synchronized (descsMap) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700509
510 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
511 log.debug("Ignoring outdated event: {}", deltaDesc);
512 return null;
513 }
514
515 DeviceDescriptions descs = descsMap.get(providerId);
516 // assuming all providers must to give DeviceDescription
517 checkArgument(descs != null,
518 "Device description for Device ID %s from Provider %s was not found",
519 deviceId, providerId);
520
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700521 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
522 final PortNumber number = deltaDesc.value().portNumber();
523 final Port oldPort = ports.get(number);
524 final Port newPort;
525
526 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
527 if (existingPortDesc == null ||
528 deltaDesc == existingPortDesc ||
529 deltaDesc.isNewer(existingPortDesc)) {
530 // on new port or valid update
531 // update description
532 descs.putPortDesc(deltaDesc);
533 newPort = composePort(device, number, descsMap);
534 } else {
535 // outdated event, ignored.
536 return null;
537 }
538
539 if (oldPort == null) {
540 return createPort(device, newPort, ports);
541 } else {
542 return updatePort(device, oldPort, newPort, ports);
543 }
544 }
545 }
546
547 @Override
548 public List<Port> getPorts(DeviceId deviceId) {
549 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
550 if (ports == null) {
551 return Collections.emptyList();
552 }
553 return ImmutableList.copyOf(ports.values());
554 }
555
556 @Override
557 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
558 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
559 return ports == null ? null : ports.get(portNumber);
560 }
561
562 @Override
563 public boolean isAvailable(DeviceId deviceId) {
564 return availableDevices.contains(deviceId);
565 }
566
567 @Override
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700568 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
569 Timestamp timestamp = clockService.getTimestamp(deviceId);
570 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
571 // TODO: broadcast removal event
572 return event;
573 }
574
575 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
576 Timestamp timestamp) {
577
578 Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700579 synchronized (descs) {
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700580 // accept removal request if given timestamp is newer than
581 // the latest Timestamp from Primary provider
582 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
583 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
584 if (timestamp.compareTo(lastTimestamp) <= 0) {
585 // outdated event ignore
586 return null;
587 }
588 removalRequest.put(deviceId, timestamp);
589
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700590 Device device = devices.remove(deviceId);
591 // should DEVICE_REMOVED carry removed ports?
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700592 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
593 if (ports != null) {
594 ports.clear();
595 }
596 markOfflineInternal(deviceId, timestamp);
Yuta HIGUCHI0d6a5e62014-10-03 15:54:09 -0700597 descs.clear();
598 return device == null ? null :
599 new DeviceEvent(DEVICE_REMOVED, device, null);
600 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700601 }
602
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700603 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
604 Timestamp removalTimestamp = removalRequest.get(deviceId);
605 if (removalTimestamp != null &&
606 removalTimestamp.compareTo(timestampToCheck) >= 0) {
607 // removalRequest is more recent
608 return true;
609 }
610 return false;
611 }
612
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700613 /**
614 * Returns a Device, merging description given from multiple Providers.
615 *
616 * @param deviceId device identifier
617 * @param providerDescs Collection of Descriptions from multiple providers
618 * @return Device instance
619 */
620 private Device composeDevice(DeviceId deviceId,
621 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
622
623 checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
624
625 ProviderId primary = pickPrimaryPID(providerDescs);
626
627 DeviceDescriptions desc = providerDescs.get(primary);
628
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700629 final DeviceDescription base = desc.getDeviceDesc().value();
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700630 Type type = base.type();
631 String manufacturer = base.manufacturer();
632 String hwVersion = base.hwVersion();
633 String swVersion = base.swVersion();
634 String serialNumber = base.serialNumber();
635 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
636 annotations = merge(annotations, base.annotations());
637
638 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
639 if (e.getKey().equals(primary)) {
640 continue;
641 }
642 // TODO: should keep track of Description timestamp
643 // and only merge conflicting keys when timestamp is newer
644 // Currently assuming there will never be a key conflict between
645 // providers
646
647 // annotation merging. not so efficient, should revisit later
648 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
649 }
650
651 return new DefaultDevice(primary, deviceId , type, manufacturer,
652 hwVersion, swVersion, serialNumber, annotations);
653 }
654
655 /**
656 * Returns a Port, merging description given from multiple Providers.
657 *
658 * @param device device the port is on
659 * @param number port number
660 * @param providerDescs Collection of Descriptions from multiple providers
661 * @return Port instance
662 */
663 private Port composePort(Device device, PortNumber number,
664 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
665
666 ProviderId primary = pickPrimaryPID(providerDescs);
667 DeviceDescriptions primDescs = providerDescs.get(primary);
668 // if no primary, assume not enabled
669 // TODO: revisit this default port enabled/disabled behavior
670 boolean isEnabled = false;
671 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
672
673 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
674 if (portDesc != null) {
675 isEnabled = portDesc.value().isEnabled();
676 annotations = merge(annotations, portDesc.value().annotations());
677 }
678
679 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
680 if (e.getKey().equals(primary)) {
681 continue;
682 }
683 // TODO: should keep track of Description timestamp
684 // and only merge conflicting keys when timestamp is newer
685 // Currently assuming there will never be a key conflict between
686 // providers
687
688 // annotation merging. not so efficient, should revisit later
689 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
690 if (otherPortDesc != null) {
691 annotations = merge(annotations, otherPortDesc.value().annotations());
692 }
693 }
694
695 return new DefaultPort(device, number, isEnabled, annotations);
696 }
697
698 /**
699 * @return primary ProviderID, or randomly chosen one if none exists
700 */
701 private ProviderId pickPrimaryPID(
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700702 Map<ProviderId, DeviceDescriptions> providerDescs) {
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700703 ProviderId fallBackPrimary = null;
704 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
705 if (!e.getKey().isAncillary()) {
706 return e.getKey();
707 } else if (fallBackPrimary == null) {
708 // pick randomly as a fallback in case there is no primary
709 fallBackPrimary = e.getKey();
710 }
711 }
712 return fallBackPrimary;
713 }
714
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700715 private DeviceDescriptions getPrimaryDescriptions(
716 Map<ProviderId, DeviceDescriptions> providerDescs) {
717 ProviderId pid = pickPrimaryPID(providerDescs);
718 return providerDescs.get(pid);
719 }
720
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700721 public static final class InitDeviceDescs
722 implements ConcurrentInitializer<DeviceDescriptions> {
723
724 private final Timestamped<DeviceDescription> deviceDesc;
725
726 public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
727 this.deviceDesc = checkNotNull(deviceDesc);
728 }
729 @Override
730 public DeviceDescriptions get() throws ConcurrentException {
731 return new DeviceDescriptions(deviceDesc);
732 }
733 }
734
735
736 /**
737 * Collection of Description of a Device and it's Ports given from a Provider.
738 */
739 public static class DeviceDescriptions {
740
741 private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
742 private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
743
744 public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
745 this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
746 this.portDescs = new ConcurrentHashMap<>();
747 }
748
Yuta HIGUCHIc35efac2014-10-06 14:43:53 -0700749 Timestamp getLatestTimestamp() {
750 Timestamp latest = deviceDesc.get().timestamp();
751 for (Timestamped<PortDescription> desc : portDescs.values()) {
752 if (desc.timestamp().compareTo(latest) > 0) {
753 latest = desc.timestamp();
754 }
755 }
756 return latest;
757 }
758
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700759 public Timestamped<DeviceDescription> getDeviceDesc() {
760 return deviceDesc.get();
761 }
762
763 public Timestamped<PortDescription> getPortDesc(PortNumber number) {
764 return portDescs.get(number);
765 }
766
767 /**
768 * Puts DeviceDescription, merging annotations as necessary.
769 *
770 * @param newDesc new DeviceDescription
771 * @return previous DeviceDescription
772 */
773 public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
774 Timestamped<DeviceDescription> oldOne = deviceDesc.get();
775 Timestamped<DeviceDescription> newOne = newDesc;
776 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700777 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700778 newDesc.value().annotations());
779 newOne = new Timestamped<DeviceDescription>(
780 new DefaultDeviceDescription(newDesc.value(), merged),
781 newDesc.timestamp());
782 }
783 return deviceDesc.getAndSet(newOne);
784 }
785
786 /**
787 * Puts PortDescription, merging annotations as necessary.
788 *
789 * @param newDesc new PortDescription
790 * @return previous PortDescription
791 */
792 public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
793 Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
794 Timestamped<PortDescription> newOne = newDesc;
795 if (oldOne != null) {
Yuta HIGUCHI885be1d2014-10-04 21:47:26 -0700796 SparseAnnotations merged = union(oldOne.value().annotations(),
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700797 newDesc.value().annotations());
798 newOne = new Timestamped<PortDescription>(
799 new DefaultPortDescription(newDesc.value(), merged),
800 newDesc.timestamp());
801 }
802 return portDescs.put(newOne.value().portNumber(), newOne);
803 }
804 }
Madan Jampani47c93732014-10-06 20:46:08 -0700805
806 private void notifyPeers(InternalDeviceEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700807 ClusterMessage message = new ClusterMessage(
808 clusterService.getLocalNode().id(),
809 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
810 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700811 clusterCommunicator.broadcast(message);
812 }
813
814 private void notifyPeers(InternalPortEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700815 ClusterMessage message = new ClusterMessage(
816 clusterService.getLocalNode().id(),
817 GossipDeviceStoreMessageSubjects.PORT_UPDATE,
818 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700819 clusterCommunicator.broadcast(message);
820 }
821
822 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
Madan Jampani53e44e62014-10-07 12:39:51 -0700823 ClusterMessage message = new ClusterMessage(
824 clusterService.getLocalNode().id(),
825 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
826 SERIALIZER.encode(event));
Madan Jampani47c93732014-10-06 20:46:08 -0700827 clusterCommunicator.broadcast(message);
828 }
829
830 private class InternalDeviceEventListener implements ClusterMessageHandler {
831 @Override
832 public void handle(ClusterMessage message) {
833 log.info("Received device update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700834 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700835 ProviderId providerId = event.providerId();
836 DeviceId deviceId = event.deviceId();
837 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
838 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
839 }
840 }
841
842 private class InternalPortEventListener implements ClusterMessageHandler {
843 @Override
844 public void handle(ClusterMessage message) {
845
846 log.info("Received port update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700847 InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700848
849 ProviderId providerId = event.providerId();
850 DeviceId deviceId = event.deviceId();
851 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
852
853 updatePortsInternal(providerId, deviceId, portDescriptions);
854 }
855 }
856
857 private class InternalPortStatusEventListener implements ClusterMessageHandler {
858 @Override
859 public void handle(ClusterMessage message) {
860
861 log.info("Received port status update event from peer: {}", message.sender());
Madan Jampani53e44e62014-10-07 12:39:51 -0700862 InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
Madan Jampani47c93732014-10-06 20:46:08 -0700863
864 ProviderId providerId = event.providerId();
865 DeviceId deviceId = event.deviceId();
866 Timestamped<PortDescription> portDescription = event.portDescription();
867
868 updatePortStatusInternal(providerId, deviceId, portDescription);
869 }
870 }
Yuta HIGUCHI67a527f2014-10-02 22:23:54 -0700871}