blob: 15a28fc4b9189e0b320610caea5c111cce86be14 [file] [log] [blame]
Madan Jampanifc8aced2015-08-27 11:06:12 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Madan Jampanifc8aced2015-08-27 11:06:12 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.device.impl;
17
Ray Milkey2bf5ea72017-06-01 09:03:34 -070018import java.util.Collection;
19import java.util.Collections;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Objects;
24import java.util.Optional;
25import java.util.Set;
26import java.util.concurrent.TimeUnit;
27import java.util.concurrent.atomic.AtomicReference;
28import java.util.stream.Collectors;
29import java.util.stream.Stream;
30
Madan Jampanifc8aced2015-08-27 11:06:12 -070031import org.apache.felix.scr.annotations.Activate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070032import org.apache.felix.scr.annotations.Deactivate;
33import org.apache.felix.scr.annotations.Reference;
34import org.apache.felix.scr.annotations.ReferenceCardinality;
35import org.apache.felix.scr.annotations.Service;
36import org.onlab.packet.ChassisId;
37import org.onlab.util.KryoNamespace;
Madan Jampanifc8aced2015-08-27 11:06:12 -070038import org.onosproject.cluster.ClusterService;
39import org.onosproject.cluster.NodeId;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.mastership.MastershipTermService;
42import org.onosproject.net.Annotations;
43import org.onosproject.net.AnnotationsUtil;
44import org.onosproject.net.DefaultAnnotations;
45import org.onosproject.net.DefaultDevice;
46import org.onosproject.net.DefaultPort;
47import org.onosproject.net.Device;
Jian Li11599162016-01-15 15:46:16 -080048import org.onosproject.net.Device.Type;
Madan Jampanifc8aced2015-08-27 11:06:12 -070049import org.onosproject.net.DeviceId;
50import org.onosproject.net.MastershipRole;
Madan Jampanifc8aced2015-08-27 11:06:12 -070051import org.onosproject.net.Port;
52import org.onosproject.net.PortNumber;
Madan Jampanifc8aced2015-08-27 11:06:12 -070053import org.onosproject.net.device.DefaultPortStatistics;
54import org.onosproject.net.device.DeviceClockService;
55import org.onosproject.net.device.DeviceDescription;
56import org.onosproject.net.device.DeviceEvent;
57import org.onosproject.net.device.DeviceStore;
58import org.onosproject.net.device.DeviceStoreDelegate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070059import org.onosproject.net.device.PortDescription;
60import org.onosproject.net.device.PortStatistics;
61import org.onosproject.net.provider.ProviderId;
62import org.onosproject.store.AbstractStore;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.impl.MastershipBasedTimestamp;
65import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070066import org.onosproject.store.serializers.StoreSerializer;
Madan Jampanifc8aced2015-08-27 11:06:12 -070067import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
68import org.onosproject.store.service.DistributedSet;
69import org.onosproject.store.service.EventuallyConsistentMap;
70import org.onosproject.store.service.EventuallyConsistentMapEvent;
Jian Li11599162016-01-15 15:46:16 -080071import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070072import org.onosproject.store.service.Serializer;
73import org.onosproject.store.service.SetEvent;
74import org.onosproject.store.service.SetEventListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070075import org.onosproject.store.service.StorageService;
Jian Li11599162016-01-15 15:46:16 -080076import org.onosproject.store.service.WallClockTimestamp;
Madan Jampanifc8aced2015-08-27 11:06:12 -070077import org.slf4j.Logger;
78
Ray Milkey2bf5ea72017-06-01 09:03:34 -070079import com.google.common.collect.ImmutableList;
80import com.google.common.collect.Iterables;
81import com.google.common.collect.Lists;
82import com.google.common.collect.Maps;
83import com.google.common.collect.Sets;
84import com.google.common.util.concurrent.Futures;
Jian Li11599162016-01-15 15:46:16 -080085
86import static com.google.common.base.Preconditions.checkArgument;
87import static com.google.common.base.Verify.verify;
88import static org.onosproject.net.DefaultAnnotations.merge;
89import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
90import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
91import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
92import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
93import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
94import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
95import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
Madan Jampanifc8aced2015-08-27 11:06:12 -070096import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
Jian Li11599162016-01-15 15:46:16 -080097import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
98import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
99import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700100
101/**
102 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
103 */
Jian Li11599162016-01-15 15:46:16 -0800104//@Component(immediate = true, enabled = false)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700105@Service
106public class ECDeviceStore
107 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
108 implements DeviceStore {
109
110 private final Logger log = getLogger(getClass());
111
112 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
113
114 private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
115 private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
116 Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
117
118 private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
119 private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
120 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
121 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
122
123 private DistributedSet<DeviceId> availableDevices;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected StorageService storageService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected MastershipService mastershipService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected MastershipTermService mastershipTermService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected DeviceClockService deviceClockService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected ClusterCommunicationService clusterCommunicator;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected ClusterService clusterService;
142
143 private NodeId localNodeId;
144 private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
145 new InternalDeviceChangeEventListener();
146 private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
147 new InternalPortChangeEventListener();
148 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
149 new InternalPortStatsListener();
150 private final SetEventListener<DeviceId> deviceStatusTracker =
151 new InternalDeviceStatusTracker();
152
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700153 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
154 KryoNamespace.newBuilder()
Madan Jampanifc8aced2015-08-27 11:06:12 -0700155 .register(DistributedStoreSerializers.STORE_COMMON)
156 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700157 .build("ECDevice"));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700158
159 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
160 .register(KryoNamespaces.API)
161 .register(DeviceKey.class)
162 .register(PortKey.class)
163 .register(DeviceKey.class)
164 .register(PortKey.class)
165 .register(MastershipBasedTimestamp.class);
166
167 @Activate
168 public void activate() {
169 localNodeId = clusterService.getLocalNode().id();
170
171 deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
172 .withName("onos-device-descriptions")
173 .withSerializer(SERIALIZER_BUILDER)
174 .withTimestampProvider((k, v) -> {
175 try {
176 return deviceClockService.getTimestamp(k.deviceId());
177 } catch (IllegalStateException e) {
178 return null;
179 }
180 }).build();
181
182 portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
183 .withName("onos-port-descriptions")
184 .withSerializer(SERIALIZER_BUILDER)
185 .withTimestampProvider((k, v) -> {
186 try {
187 return deviceClockService.getTimestamp(k.deviceId());
188 } catch (IllegalStateException e) {
189 return null;
190 }
191 }).build();
192
193 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
194 .withName("onos-port-stats")
195 .withSerializer(SERIALIZER_BUILDER)
196 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
197 .withTimestampProvider((k, v) -> new WallClockTimestamp())
198 .withTombstonesDisabled()
199 .build();
200
201 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
202 eventuallyConsistentMapBuilder()
203 .withName("onos-port-stats-delta")
204 .withSerializer(SERIALIZER_BUILDER)
205 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
206 .withTimestampProvider((k, v) -> new WallClockTimestamp())
207 .withTombstonesDisabled()
208 .build();
209
Madan Jampanifc8aced2015-08-27 11:06:12 -0700210 availableDevices = storageService.<DeviceId>setBuilder()
211 .withName("onos-online-devices")
212 .withSerializer(Serializer.using(KryoNamespaces.API))
Madan Jampanifc8aced2015-08-27 11:06:12 -0700213 .withRelaxedReadConsistency()
Madan Jampani538be742016-02-10 14:55:38 -0800214 .build()
215 .asDistributedSet();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700216
217 deviceDescriptions.addListener(deviceUpdateListener);
218 portDescriptions.addListener(portUpdateListener);
219 devicePortStats.addListener(portStatsListener);
220 availableDevices.addListener(deviceStatusTracker);
221 log.info("Started");
222 }
223
224 @Deactivate
225 public void deactivate() {
226 devicePortStats.removeListener(portStatsListener);
227 deviceDescriptions.removeListener(deviceUpdateListener);
228 portDescriptions.removeListener(portUpdateListener);
229 availableDevices.removeListener(deviceStatusTracker);
230 devicePortStats.destroy();
231 devicePortDeltaStats.destroy();
232 deviceDescriptions.destroy();
233 portDescriptions.destroy();
234 devices.clear();
235 devicePorts.clear();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700236 log.info("Stopped");
237 }
238
239 @Override
240 public Iterable<Device> getDevices() {
241 return devices.values();
242 }
243
244 @Override
245 public int getDeviceCount() {
246 return devices.size();
247 }
248
249 @Override
mskala0d0c6832017-07-12 11:21:23 +0200250 public int getAvailableDeviceCount() {
251 return availableDevices.size();
252 }
253
254 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700255 public Device getDevice(DeviceId deviceId) {
256 return devices.get(deviceId);
257 }
258
helenyrwufd296b62016-06-22 17:43:02 -0700259 // FIXME handle deviceDescription.isDefaultAvailable()=false case properly.
Madan Jampanifc8aced2015-08-27 11:06:12 -0700260 @Override
261 public DeviceEvent createOrUpdateDevice(ProviderId providerId,
262 DeviceId deviceId,
263 DeviceDescription deviceDescription) {
264 NodeId master = mastershipService.getMasterFor(deviceId);
265 if (localNodeId.equals(master)) {
266 deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
267 return refreshDeviceCache(providerId, deviceId);
268 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800269 return null;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700270 }
271 }
272
273 private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
274 AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
275 Device device = devices.compute(deviceId, (k, existingDevice) -> {
276 Device newDevice = composeDevice(deviceId);
277 if (existingDevice == null) {
278 eventType.set(DEVICE_ADDED);
279 } else {
280 // We allow only certain attributes to trigger update
281 boolean propertiesChanged =
282 !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
283 !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
284 !Objects.equals(existingDevice.providerId(), newDevice.providerId());
285 boolean annotationsChanged =
286 !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
287
288 // Primary providers can respond to all changes, but ancillary ones
289 // should respond only to annotation changes.
290 if ((providerId.isAncillary() && annotationsChanged) ||
291 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
292 boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
293 verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
294 providerId, existingDevice, devices.get(deviceId), newDevice);
295 eventType.set(DEVICE_UPDATED);
296 }
297 }
298 return newDevice;
299 });
300 if (eventType.get() != null && !providerId.isAncillary()) {
301 markOnline(deviceId);
302 }
303 return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
304 }
305
306 /**
307 * Returns the primary providerId for a device.
308 * @param deviceId device identifier
309 * @return primary providerId
310 */
311 private Set<ProviderId> getAllProviders(DeviceId deviceId) {
312 return deviceDescriptions.keySet()
313 .stream()
314 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
315 .map(deviceKey -> deviceKey.providerId())
316 .collect(Collectors.toSet());
317 }
318
319 /**
320 * Returns the identifier for all providers for a device.
321 * @param deviceId device identifier
322 * @return set of provider identifiers
323 */
324 private ProviderId getPrimaryProviderId(DeviceId deviceId) {
325 Set<ProviderId> allProviderIds = getAllProviders(deviceId);
326 return allProviderIds.stream()
327 .filter(p -> !p.isAncillary())
328 .findFirst()
329 .orElse(Iterables.getFirst(allProviderIds, null));
330 }
331
332 /**
333 * Returns a Device, merging descriptions from multiple Providers.
334 *
335 * @param deviceId device identifier
336 * @return Device instance
337 */
338 private Device composeDevice(DeviceId deviceId) {
339
340 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
341 DeviceDescription primaryDeviceDescription =
342 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
343
344 Type type = primaryDeviceDescription.type();
345 String manufacturer = primaryDeviceDescription.manufacturer();
346 String hwVersion = primaryDeviceDescription.hwVersion();
347 String swVersion = primaryDeviceDescription.swVersion();
348 String serialNumber = primaryDeviceDescription.serialNumber();
349 ChassisId chassisId = primaryDeviceDescription.chassisId();
350 DefaultAnnotations annotations = mergeAnnotations(deviceId);
351
352 return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
353 hwVersion, swVersion, serialNumber,
354 chassisId, annotations);
355 }
356
357 private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
358 Device removedDevice = devices.remove(deviceId);
359 if (removedDevice != null) {
360 getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
361 return new DeviceEvent(DEVICE_REMOVED, removedDevice);
362 }
363 return null;
364 }
365
helenyrwufd296b62016-06-22 17:43:02 -0700366 // FIXME publicization of markOnline -- trigger some action independently?
367 public boolean markOnline(DeviceId deviceId) {
368 if (devices.containsKey(deviceId)) {
369 return availableDevices.add(deviceId);
370 }
371 log.warn("Device {} does not exist in store", deviceId);
372 return false;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700373 }
374
375 @Override
376 public DeviceEvent markOffline(DeviceId deviceId) {
377 availableDevices.remove(deviceId);
378 // set update listener will raise the event.
379 return null;
380 }
381
382 @Override
383 public List<DeviceEvent> updatePorts(ProviderId providerId,
384 DeviceId deviceId,
385 List<PortDescription> descriptions) {
386 NodeId master = mastershipService.getMasterFor(deviceId);
387 List<DeviceEvent> deviceEvents = null;
388 if (localNodeId.equals(master)) {
389 descriptions.forEach(description -> {
390 PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
391 portDescriptions.put(portKey, description);
392 });
393 deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
394 } else {
Ray Milkey2bf5ea72017-06-01 09:03:34 -0700395 return Collections.emptyList();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700396 }
397 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
398 }
399
400 private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
401 DeviceId deviceId,
402 Optional<PortNumber> portNumber) {
403 Device device = devices.get(deviceId);
404 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
405 List<DeviceEvent> events = Lists.newArrayList();
406
407 Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
408 List<PortDescription> descriptions = Lists.newArrayList();
409 portDescriptions.entrySet().forEach(e -> {
410 PortKey key = e.getKey();
411 PortDescription value = e.getValue();
412 if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
413 if (portNumber.isPresent()) {
414 if (portNumber.get().equals(key.portNumber())) {
415 descriptions.add(value);
416 }
417 } else {
418 descriptions.add(value);
419 }
420 }
421 });
422
423 for (PortDescription description : descriptions) {
424 final PortNumber number = description.portNumber();
425 ports.compute(number, (k, existingPort) -> {
426 Port newPort = composePort(device, number);
427 if (existingPort == null) {
428 events.add(new DeviceEvent(PORT_ADDED, device, newPort));
429 } else {
430 if (existingPort.isEnabled() != newPort.isEnabled() ||
431 existingPort.type() != newPort.type() ||
432 existingPort.portSpeed() != newPort.portSpeed() ||
433 !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
434 events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
435 }
436 }
437 return newPort;
438 });
439 }
440
441 return events;
442 }
443
444 /**
445 * Returns a Port, merging descriptions from multiple Providers.
446 *
447 * @param device device the port is on
448 * @param number port number
449 * @return Port instance
450 */
451 private Port composePort(Device device, PortNumber number) {
452
453 Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
454 portDescriptions.entrySet().forEach(entry -> {
455 PortKey portKey = entry.getKey();
456 if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
457 descriptions.put(portKey.providerId(), entry.getValue());
458 }
459 });
460 ProviderId primary = getPrimaryProviderId(device.id());
461 PortDescription primaryDescription = descriptions.get(primary);
462
463 // if no primary, assume not enabled
464 boolean isEnabled = false;
465 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
466 if (primaryDescription != null) {
467 isEnabled = primaryDescription.isEnabled();
468 annotations = merge(annotations, primaryDescription.annotations());
469 }
470 Port updated = null;
471 for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
472 if (e.getKey().equals(primary)) {
473 continue;
474 }
475 annotations = merge(annotations, e.getValue().annotations());
476 updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
477 }
478 if (primaryDescription == null) {
479 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
480 }
481 return updated == null
482 ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
483 : updated;
484 }
485
486 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
487 PortDescription description, Annotations annotations) {
Madan Jampanifc8aced2015-08-27 11:06:12 -0700488 return new DefaultPort(device, number, isEnabled, description.type(),
489 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700490 }
491
492 @Override
493 public DeviceEvent updatePortStatus(ProviderId providerId,
494 DeviceId deviceId,
495 PortDescription portDescription) {
496 portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
497 List<DeviceEvent> events =
498 refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
499 return Iterables.getFirst(events, null);
500 }
501
502 @Override
503 public List<Port> getPorts(DeviceId deviceId) {
504 return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
505 }
506
507 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700508 public Stream<PortDescription> getPortDescriptions(ProviderId pid,
509 DeviceId deviceId) {
510
511 return portDescriptions.entrySet().stream()
512 .filter(e -> e.getKey().providerId().equals(pid))
513 .map(Map.Entry::getValue);
514 }
515
516 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700517 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
518 return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
519 }
520
521 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700522 public PortDescription getPortDescription(ProviderId pid,
523 DeviceId deviceId,
524 PortNumber portNumber) {
525 return portDescriptions.get(new PortKey(pid, deviceId, portNumber));
526 }
527
528 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700529 public DeviceEvent updatePortStatistics(ProviderId providerId,
530 DeviceId deviceId,
531 Collection<PortStatistics> newStatsCollection) {
532
533 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
534 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
535 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
536
537 if (prvStatsMap != null) {
538 for (PortStatistics newStats : newStatsCollection) {
539 PortNumber port = PortNumber.portNumber(newStats.port());
540 PortStatistics prvStats = prvStatsMap.get(port);
541 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
542 PortStatistics deltaStats = builder.build();
543 if (prvStats != null) {
544 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
545 }
546 deltaStatsMap.put(port, deltaStats);
547 newStatsMap.put(port, newStats);
548 }
549 } else {
550 for (PortStatistics newStats : newStatsCollection) {
551 PortNumber port = PortNumber.portNumber(newStats.port());
552 newStatsMap.put(port, newStats);
553 }
554 }
555 devicePortDeltaStats.put(deviceId, deltaStatsMap);
556 devicePortStats.put(deviceId, newStatsMap);
557 // DeviceEvent returns null because of InternalPortStatsListener usage
558 return null;
559 }
560
561 /**
562 * Calculate delta statistics by subtracting previous from new statistics.
563 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700564 * @param deviceId device indentifier
565 * @param prvStats previous port statistics
566 * @param newStats new port statistics
Madan Jampanifc8aced2015-08-27 11:06:12 -0700567 * @return PortStatistics
568 */
569 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
570 // calculate time difference
571 long deltaStatsSec, deltaStatsNano;
572 if (newStats.durationNano() < prvStats.durationNano()) {
573 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
574 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
575 } else {
576 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
577 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
578 }
579 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
580 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
581 .setPort(newStats.port())
582 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
583 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
584 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
585 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
586 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
587 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
588 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
589 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
590 .setDurationSec(deltaStatsSec)
591 .setDurationNano(deltaStatsNano)
592 .build();
593 return deltaStats;
594 }
595
596 @Override
597 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
598 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
599 if (portStats == null) {
600 return Collections.emptyList();
601 }
602 return ImmutableList.copyOf(portStats.values());
603 }
604
605 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530606 public PortStatistics getStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
607 Map<PortNumber, PortStatistics> portStatsMap = devicePortStats.get(deviceId);
608 if (portStatsMap == null) {
609 return null;
610 }
611 PortStatistics portStats = portStatsMap.get(portNumber);
612 return portStats;
613 }
614
615 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700616 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
617 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
618 if (portStats == null) {
619 return Collections.emptyList();
620 }
621 return ImmutableList.copyOf(portStats.values());
622 }
623
624 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530625 public PortStatistics getDeltaStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
626 Map<PortNumber, PortStatistics> portStatsMap = devicePortDeltaStats.get(deviceId);
627 if (portStatsMap == null) {
628 return null;
629 }
630 PortStatistics portStats = portStatsMap.get(portNumber);
631 return portStats;
632 }
633
634 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700635 public boolean isAvailable(DeviceId deviceId) {
636 return availableDevices.contains(deviceId);
637 }
638
639 @Override
640 public Iterable<Device> getAvailableDevices() {
641 return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
642 }
643
644 @Override
645 public DeviceEvent removeDevice(DeviceId deviceId) {
646 NodeId master = mastershipService.getMasterFor(deviceId);
647 // if there exist a master, forward
648 // if there is no master, try to become one and process
649 boolean relinquishAtEnd = false;
650 if (master == null) {
651 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
652 if (myRole != MastershipRole.NONE) {
653 relinquishAtEnd = true;
654 }
655 log.debug("Temporarily requesting role for {} to remove", deviceId);
656 MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
657 if (role == MastershipRole.MASTER) {
658 master = localNodeId;
659 }
660 }
661
662 if (!localNodeId.equals(master)) {
663 log.debug("{} has control of {}, forwarding remove request",
664 master, deviceId);
665
666 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
667 .whenComplete((r, e) -> {
668 if (e != null) {
669 log.error("Failed to forward {} remove request to its master", deviceId, e);
670 }
671 });
672 return null;
673 }
674
675 // I have control..
676 DeviceEvent event = null;
677 final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
678 DeviceDescription removedDeviceDescription =
679 deviceDescriptions.remove(deviceKey);
680 if (removedDeviceDescription != null) {
681 event = purgeDeviceCache(deviceId);
682 }
683
684 if (relinquishAtEnd) {
685 log.debug("Relinquishing temporary role acquired for {}", deviceId);
686 mastershipService.relinquishMastership(deviceId);
687 }
688 return event;
689 }
690
Madan Jampanifc8aced2015-08-27 11:06:12 -0700691 private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
692 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
693 DeviceDescription primaryDeviceDescription =
694 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
695 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
696 annotations = merge(annotations, primaryDeviceDescription.annotations());
697 for (ProviderId providerId : getAllProviders(deviceId)) {
698 if (!providerId.equals(primaryProviderId)) {
699 annotations = merge(annotations,
700 deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
701 }
702 }
703 return annotations;
704 }
705
706 private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
707 @Override
708 public void event(SetEvent<DeviceId> event) {
709 final DeviceId deviceId = event.entry();
710 final Device device = devices.get(deviceId);
711 if (device != null) {
712 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
713 } else {
714 pendingAvailableChangeUpdates.add(deviceId);
715 }
716 }
717 }
718
719 private class InternalDeviceChangeEventListener
720 implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
721 @Override
722 public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
723 DeviceId deviceId = event.key().deviceId();
724 ProviderId providerId = event.key().providerId();
725 if (event.type() == PUT) {
726 notifyDelegate(refreshDeviceCache(providerId, deviceId));
727 if (pendingAvailableChangeUpdates.remove(deviceId)) {
728 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
729 }
730 } else if (event.type() == REMOVE) {
731 notifyDelegate(purgeDeviceCache(deviceId));
732 }
733 }
734 }
735
736 private class InternalPortChangeEventListener
737 implements EventuallyConsistentMapListener<PortKey, PortDescription> {
738 @Override
739 public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
740 DeviceId deviceId = event.key().deviceId();
741 ProviderId providerId = event.key().providerId();
742 PortNumber portNumber = event.key().portNumber();
743 if (event.type() == PUT) {
744 if (devices.containsKey(deviceId)) {
745 List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
746 for (DeviceEvent deviceEvent : events) {
747 notifyDelegate(deviceEvent);
748 }
749 }
750 } else if (event.type() == REMOVE) {
751 log.warn("Unexpected port removed event");
752 }
753 }
754 }
755
756 private class InternalPortStatsListener
757 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
758 @Override
759 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
760 if (event.type() == PUT) {
761 Device device = devices.get(event.key());
762 if (device != null) {
Thomas Vachuskad4955ae2016-08-23 14:56:37 -0700763 notifyDelegate(new DeviceEvent(PORT_STATS_UPDATED, device));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700764 }
765 }
766 }
767 }
Ray Milkey2bf5ea72017-06-01 09:03:34 -0700768}