blob: 177c41edb878d57abebec7c99f29c82f83d662ea [file] [log] [blame]
Madan Jampanifc8aced2015-08-27 11:06:12 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Madan Jampanifc8aced2015-08-27 11:06:12 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.device.impl;
17
Ray Milkey2bf5ea72017-06-01 09:03:34 -070018import java.util.Collection;
19import java.util.Collections;
20import java.util.List;
21import java.util.Map;
22import java.util.Map.Entry;
23import java.util.Objects;
24import java.util.Optional;
25import java.util.Set;
26import java.util.concurrent.TimeUnit;
27import java.util.concurrent.atomic.AtomicReference;
28import java.util.stream.Collectors;
29import java.util.stream.Stream;
30
Madan Jampanifc8aced2015-08-27 11:06:12 -070031import org.apache.felix.scr.annotations.Activate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070032import org.apache.felix.scr.annotations.Deactivate;
33import org.apache.felix.scr.annotations.Reference;
34import org.apache.felix.scr.annotations.ReferenceCardinality;
35import org.apache.felix.scr.annotations.Service;
36import org.onlab.packet.ChassisId;
37import org.onlab.util.KryoNamespace;
Madan Jampanifc8aced2015-08-27 11:06:12 -070038import org.onosproject.cluster.ClusterService;
39import org.onosproject.cluster.NodeId;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.mastership.MastershipTermService;
42import org.onosproject.net.Annotations;
43import org.onosproject.net.AnnotationsUtil;
44import org.onosproject.net.DefaultAnnotations;
45import org.onosproject.net.DefaultDevice;
46import org.onosproject.net.DefaultPort;
47import org.onosproject.net.Device;
Jian Li11599162016-01-15 15:46:16 -080048import org.onosproject.net.Device.Type;
Madan Jampanifc8aced2015-08-27 11:06:12 -070049import org.onosproject.net.DeviceId;
50import org.onosproject.net.MastershipRole;
Madan Jampanifc8aced2015-08-27 11:06:12 -070051import org.onosproject.net.Port;
52import org.onosproject.net.PortNumber;
Madan Jampanifc8aced2015-08-27 11:06:12 -070053import org.onosproject.net.device.DefaultPortStatistics;
54import org.onosproject.net.device.DeviceClockService;
55import org.onosproject.net.device.DeviceDescription;
56import org.onosproject.net.device.DeviceEvent;
57import org.onosproject.net.device.DeviceStore;
58import org.onosproject.net.device.DeviceStoreDelegate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070059import org.onosproject.net.device.PortDescription;
60import org.onosproject.net.device.PortStatistics;
61import org.onosproject.net.provider.ProviderId;
62import org.onosproject.store.AbstractStore;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.impl.MastershipBasedTimestamp;
65import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanifc8aced2015-08-27 11:06:12 -070066import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
67import org.onosproject.store.service.DistributedSet;
68import org.onosproject.store.service.EventuallyConsistentMap;
69import org.onosproject.store.service.EventuallyConsistentMapEvent;
Jian Li11599162016-01-15 15:46:16 -080070import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070071import org.onosproject.store.service.Serializer;
72import org.onosproject.store.service.SetEvent;
73import org.onosproject.store.service.SetEventListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070074import org.onosproject.store.service.StorageService;
Jian Li11599162016-01-15 15:46:16 -080075import org.onosproject.store.service.WallClockTimestamp;
Madan Jampanifc8aced2015-08-27 11:06:12 -070076import org.slf4j.Logger;
77
Ray Milkey2bf5ea72017-06-01 09:03:34 -070078import com.google.common.collect.ImmutableList;
79import com.google.common.collect.Iterables;
80import com.google.common.collect.Lists;
81import com.google.common.collect.Maps;
82import com.google.common.collect.Sets;
83import com.google.common.util.concurrent.Futures;
Jian Li11599162016-01-15 15:46:16 -080084
85import static com.google.common.base.Preconditions.checkArgument;
86import static com.google.common.base.Verify.verify;
87import static org.onosproject.net.DefaultAnnotations.merge;
88import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
89import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
90import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
91import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
92import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
93import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
94import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
Madan Jampanifc8aced2015-08-27 11:06:12 -070095import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
Jian Li11599162016-01-15 15:46:16 -080096import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
97import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
98import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanifc8aced2015-08-27 11:06:12 -070099
100/**
101 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
102 */
Jian Li11599162016-01-15 15:46:16 -0800103//@Component(immediate = true, enabled = false)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700104@Service
105public class ECDeviceStore
106 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
107 implements DeviceStore {
108
109 private final Logger log = getLogger(getClass());
110
111 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
112
113 private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
114 private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
115 Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
116
117 private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
118 private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
119 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
120 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
121
122 private DistributedSet<DeviceId> availableDevices;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected StorageService storageService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected MastershipService mastershipService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected MastershipTermService mastershipTermService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected DeviceClockService deviceClockService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected ClusterCommunicationService clusterCommunicator;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected ClusterService clusterService;
141
142 private NodeId localNodeId;
143 private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
144 new InternalDeviceChangeEventListener();
145 private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
146 new InternalPortChangeEventListener();
147 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
148 new InternalPortStatsListener();
149 private final SetEventListener<DeviceId> deviceStatusTracker =
150 new InternalDeviceStatusTracker();
151
Jordan Halterman2c83a102017-08-20 17:11:41 -0700152 protected static final Serializer SERIALIZER = Serializer.using(
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700153 KryoNamespace.newBuilder()
Madan Jampanifc8aced2015-08-27 11:06:12 -0700154 .register(DistributedStoreSerializers.STORE_COMMON)
155 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700156 .build("ECDevice"));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700157
158 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
159 .register(KryoNamespaces.API)
160 .register(DeviceKey.class)
161 .register(PortKey.class)
162 .register(DeviceKey.class)
163 .register(PortKey.class)
164 .register(MastershipBasedTimestamp.class);
165
166 @Activate
167 public void activate() {
168 localNodeId = clusterService.getLocalNode().id();
169
170 deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
171 .withName("onos-device-descriptions")
172 .withSerializer(SERIALIZER_BUILDER)
173 .withTimestampProvider((k, v) -> {
174 try {
175 return deviceClockService.getTimestamp(k.deviceId());
176 } catch (IllegalStateException e) {
177 return null;
178 }
179 }).build();
180
181 portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
182 .withName("onos-port-descriptions")
183 .withSerializer(SERIALIZER_BUILDER)
184 .withTimestampProvider((k, v) -> {
185 try {
186 return deviceClockService.getTimestamp(k.deviceId());
187 } catch (IllegalStateException e) {
188 return null;
189 }
190 }).build();
191
192 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
193 .withName("onos-port-stats")
194 .withSerializer(SERIALIZER_BUILDER)
195 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
196 .withTimestampProvider((k, v) -> new WallClockTimestamp())
197 .withTombstonesDisabled()
198 .build();
199
200 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
201 eventuallyConsistentMapBuilder()
202 .withName("onos-port-stats-delta")
203 .withSerializer(SERIALIZER_BUILDER)
204 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
205 .withTimestampProvider((k, v) -> new WallClockTimestamp())
206 .withTombstonesDisabled()
207 .build();
208
Madan Jampanifc8aced2015-08-27 11:06:12 -0700209 availableDevices = storageService.<DeviceId>setBuilder()
210 .withName("onos-online-devices")
211 .withSerializer(Serializer.using(KryoNamespaces.API))
Madan Jampanifc8aced2015-08-27 11:06:12 -0700212 .withRelaxedReadConsistency()
Madan Jampani538be742016-02-10 14:55:38 -0800213 .build()
214 .asDistributedSet();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700215
216 deviceDescriptions.addListener(deviceUpdateListener);
217 portDescriptions.addListener(portUpdateListener);
218 devicePortStats.addListener(portStatsListener);
219 availableDevices.addListener(deviceStatusTracker);
220 log.info("Started");
221 }
222
223 @Deactivate
224 public void deactivate() {
225 devicePortStats.removeListener(portStatsListener);
226 deviceDescriptions.removeListener(deviceUpdateListener);
227 portDescriptions.removeListener(portUpdateListener);
228 availableDevices.removeListener(deviceStatusTracker);
229 devicePortStats.destroy();
230 devicePortDeltaStats.destroy();
231 deviceDescriptions.destroy();
232 portDescriptions.destroy();
233 devices.clear();
234 devicePorts.clear();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700235 log.info("Stopped");
236 }
237
238 @Override
239 public Iterable<Device> getDevices() {
240 return devices.values();
241 }
242
243 @Override
244 public int getDeviceCount() {
245 return devices.size();
246 }
247
248 @Override
mskala0d0c6832017-07-12 11:21:23 +0200249 public int getAvailableDeviceCount() {
250 return availableDevices.size();
251 }
252
253 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700254 public Device getDevice(DeviceId deviceId) {
255 return devices.get(deviceId);
256 }
257
helenyrwufd296b62016-06-22 17:43:02 -0700258 // FIXME handle deviceDescription.isDefaultAvailable()=false case properly.
Madan Jampanifc8aced2015-08-27 11:06:12 -0700259 @Override
260 public DeviceEvent createOrUpdateDevice(ProviderId providerId,
261 DeviceId deviceId,
262 DeviceDescription deviceDescription) {
263 NodeId master = mastershipService.getMasterFor(deviceId);
264 if (localNodeId.equals(master)) {
265 deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
266 return refreshDeviceCache(providerId, deviceId);
267 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800268 return null;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700269 }
270 }
271
272 private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
273 AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
274 Device device = devices.compute(deviceId, (k, existingDevice) -> {
275 Device newDevice = composeDevice(deviceId);
276 if (existingDevice == null) {
277 eventType.set(DEVICE_ADDED);
278 } else {
279 // We allow only certain attributes to trigger update
280 boolean propertiesChanged =
281 !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
282 !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
283 !Objects.equals(existingDevice.providerId(), newDevice.providerId());
284 boolean annotationsChanged =
285 !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
286
287 // Primary providers can respond to all changes, but ancillary ones
288 // should respond only to annotation changes.
289 if ((providerId.isAncillary() && annotationsChanged) ||
290 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
291 boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
292 verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
293 providerId, existingDevice, devices.get(deviceId), newDevice);
294 eventType.set(DEVICE_UPDATED);
295 }
296 }
297 return newDevice;
298 });
299 if (eventType.get() != null && !providerId.isAncillary()) {
300 markOnline(deviceId);
301 }
302 return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
303 }
304
305 /**
306 * Returns the primary providerId for a device.
307 * @param deviceId device identifier
308 * @return primary providerId
309 */
310 private Set<ProviderId> getAllProviders(DeviceId deviceId) {
311 return deviceDescriptions.keySet()
312 .stream()
313 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
314 .map(deviceKey -> deviceKey.providerId())
315 .collect(Collectors.toSet());
316 }
317
318 /**
319 * Returns the identifier for all providers for a device.
320 * @param deviceId device identifier
321 * @return set of provider identifiers
322 */
323 private ProviderId getPrimaryProviderId(DeviceId deviceId) {
324 Set<ProviderId> allProviderIds = getAllProviders(deviceId);
325 return allProviderIds.stream()
326 .filter(p -> !p.isAncillary())
327 .findFirst()
328 .orElse(Iterables.getFirst(allProviderIds, null));
329 }
330
331 /**
332 * Returns a Device, merging descriptions from multiple Providers.
333 *
334 * @param deviceId device identifier
335 * @return Device instance
336 */
337 private Device composeDevice(DeviceId deviceId) {
338
339 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
340 DeviceDescription primaryDeviceDescription =
341 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
342
343 Type type = primaryDeviceDescription.type();
344 String manufacturer = primaryDeviceDescription.manufacturer();
345 String hwVersion = primaryDeviceDescription.hwVersion();
346 String swVersion = primaryDeviceDescription.swVersion();
347 String serialNumber = primaryDeviceDescription.serialNumber();
348 ChassisId chassisId = primaryDeviceDescription.chassisId();
349 DefaultAnnotations annotations = mergeAnnotations(deviceId);
350
351 return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
352 hwVersion, swVersion, serialNumber,
353 chassisId, annotations);
354 }
355
356 private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
357 Device removedDevice = devices.remove(deviceId);
358 if (removedDevice != null) {
359 getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
360 return new DeviceEvent(DEVICE_REMOVED, removedDevice);
361 }
362 return null;
363 }
364
helenyrwufd296b62016-06-22 17:43:02 -0700365 // FIXME publicization of markOnline -- trigger some action independently?
Palash Kala6c526062018-04-03 18:25:11 +0900366 public DeviceEvent markOnline(DeviceId deviceId) {
helenyrwufd296b62016-06-22 17:43:02 -0700367 if (devices.containsKey(deviceId)) {
Palash Kala6c526062018-04-03 18:25:11 +0900368 if (availableDevices.add(deviceId)) {
369 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId), null);
370 }
helenyrwufd296b62016-06-22 17:43:02 -0700371 }
372 log.warn("Device {} does not exist in store", deviceId);
Palash Kala6c526062018-04-03 18:25:11 +0900373 return null;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700374 }
375
376 @Override
377 public DeviceEvent markOffline(DeviceId deviceId) {
378 availableDevices.remove(deviceId);
379 // set update listener will raise the event.
380 return null;
381 }
382
383 @Override
384 public List<DeviceEvent> updatePorts(ProviderId providerId,
385 DeviceId deviceId,
386 List<PortDescription> descriptions) {
387 NodeId master = mastershipService.getMasterFor(deviceId);
388 List<DeviceEvent> deviceEvents = null;
389 if (localNodeId.equals(master)) {
390 descriptions.forEach(description -> {
391 PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
392 portDescriptions.put(portKey, description);
393 });
394 deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
395 } else {
Ray Milkey2bf5ea72017-06-01 09:03:34 -0700396 return Collections.emptyList();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700397 }
398 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
399 }
400
401 private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
402 DeviceId deviceId,
403 Optional<PortNumber> portNumber) {
404 Device device = devices.get(deviceId);
405 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
406 List<DeviceEvent> events = Lists.newArrayList();
407
408 Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
409 List<PortDescription> descriptions = Lists.newArrayList();
410 portDescriptions.entrySet().forEach(e -> {
411 PortKey key = e.getKey();
412 PortDescription value = e.getValue();
413 if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
414 if (portNumber.isPresent()) {
415 if (portNumber.get().equals(key.portNumber())) {
416 descriptions.add(value);
417 }
418 } else {
419 descriptions.add(value);
420 }
421 }
422 });
423
424 for (PortDescription description : descriptions) {
425 final PortNumber number = description.portNumber();
426 ports.compute(number, (k, existingPort) -> {
427 Port newPort = composePort(device, number);
428 if (existingPort == null) {
429 events.add(new DeviceEvent(PORT_ADDED, device, newPort));
430 } else {
431 if (existingPort.isEnabled() != newPort.isEnabled() ||
432 existingPort.type() != newPort.type() ||
433 existingPort.portSpeed() != newPort.portSpeed() ||
434 !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
435 events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
436 }
437 }
438 return newPort;
439 });
440 }
441
442 return events;
443 }
444
445 /**
446 * Returns a Port, merging descriptions from multiple Providers.
447 *
448 * @param device device the port is on
449 * @param number port number
450 * @return Port instance
451 */
452 private Port composePort(Device device, PortNumber number) {
453
454 Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
455 portDescriptions.entrySet().forEach(entry -> {
456 PortKey portKey = entry.getKey();
457 if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
458 descriptions.put(portKey.providerId(), entry.getValue());
459 }
460 });
461 ProviderId primary = getPrimaryProviderId(device.id());
462 PortDescription primaryDescription = descriptions.get(primary);
463
464 // if no primary, assume not enabled
465 boolean isEnabled = false;
466 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
467 if (primaryDescription != null) {
468 isEnabled = primaryDescription.isEnabled();
469 annotations = merge(annotations, primaryDescription.annotations());
470 }
471 Port updated = null;
472 for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
473 if (e.getKey().equals(primary)) {
474 continue;
475 }
476 annotations = merge(annotations, e.getValue().annotations());
477 updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
478 }
479 if (primaryDescription == null) {
480 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
481 }
482 return updated == null
483 ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
484 : updated;
485 }
486
487 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
488 PortDescription description, Annotations annotations) {
Madan Jampanifc8aced2015-08-27 11:06:12 -0700489 return new DefaultPort(device, number, isEnabled, description.type(),
490 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700491 }
492
493 @Override
494 public DeviceEvent updatePortStatus(ProviderId providerId,
495 DeviceId deviceId,
496 PortDescription portDescription) {
497 portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
498 List<DeviceEvent> events =
499 refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
500 return Iterables.getFirst(events, null);
501 }
502
503 @Override
504 public List<Port> getPorts(DeviceId deviceId) {
505 return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
506 }
507
508 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700509 public Stream<PortDescription> getPortDescriptions(ProviderId pid,
510 DeviceId deviceId) {
511
512 return portDescriptions.entrySet().stream()
513 .filter(e -> e.getKey().providerId().equals(pid))
514 .map(Map.Entry::getValue);
515 }
516
517 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700518 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
519 return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
520 }
521
522 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700523 public PortDescription getPortDescription(ProviderId pid,
524 DeviceId deviceId,
525 PortNumber portNumber) {
526 return portDescriptions.get(new PortKey(pid, deviceId, portNumber));
527 }
528
529 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700530 public DeviceEvent updatePortStatistics(ProviderId providerId,
531 DeviceId deviceId,
532 Collection<PortStatistics> newStatsCollection) {
533
534 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
535 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
536 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
537
538 if (prvStatsMap != null) {
539 for (PortStatistics newStats : newStatsCollection) {
540 PortNumber port = PortNumber.portNumber(newStats.port());
541 PortStatistics prvStats = prvStatsMap.get(port);
542 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
543 PortStatistics deltaStats = builder.build();
544 if (prvStats != null) {
545 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
546 }
547 deltaStatsMap.put(port, deltaStats);
548 newStatsMap.put(port, newStats);
549 }
550 } else {
551 for (PortStatistics newStats : newStatsCollection) {
552 PortNumber port = PortNumber.portNumber(newStats.port());
553 newStatsMap.put(port, newStats);
554 }
555 }
556 devicePortDeltaStats.put(deviceId, deltaStatsMap);
557 devicePortStats.put(deviceId, newStatsMap);
558 // DeviceEvent returns null because of InternalPortStatsListener usage
559 return null;
560 }
561
562 /**
563 * Calculate delta statistics by subtracting previous from new statistics.
564 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700565 * @param deviceId device indentifier
566 * @param prvStats previous port statistics
567 * @param newStats new port statistics
Madan Jampanifc8aced2015-08-27 11:06:12 -0700568 * @return PortStatistics
569 */
570 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
571 // calculate time difference
572 long deltaStatsSec, deltaStatsNano;
573 if (newStats.durationNano() < prvStats.durationNano()) {
574 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
575 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
576 } else {
577 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
578 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
579 }
580 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
581 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
582 .setPort(newStats.port())
583 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
584 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
585 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
586 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
587 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
588 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
589 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
590 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
591 .setDurationSec(deltaStatsSec)
592 .setDurationNano(deltaStatsNano)
593 .build();
594 return deltaStats;
595 }
596
597 @Override
598 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
599 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
600 if (portStats == null) {
601 return Collections.emptyList();
602 }
603 return ImmutableList.copyOf(portStats.values());
604 }
605
606 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530607 public PortStatistics getStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
608 Map<PortNumber, PortStatistics> portStatsMap = devicePortStats.get(deviceId);
609 if (portStatsMap == null) {
610 return null;
611 }
612 PortStatistics portStats = portStatsMap.get(portNumber);
613 return portStats;
614 }
615
616 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700617 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
618 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
619 if (portStats == null) {
620 return Collections.emptyList();
621 }
622 return ImmutableList.copyOf(portStats.values());
623 }
624
625 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530626 public PortStatistics getDeltaStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
627 Map<PortNumber, PortStatistics> portStatsMap = devicePortDeltaStats.get(deviceId);
628 if (portStatsMap == null) {
629 return null;
630 }
631 PortStatistics portStats = portStatsMap.get(portNumber);
632 return portStats;
633 }
634
635 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700636 public boolean isAvailable(DeviceId deviceId) {
637 return availableDevices.contains(deviceId);
638 }
639
640 @Override
641 public Iterable<Device> getAvailableDevices() {
642 return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
643 }
644
645 @Override
646 public DeviceEvent removeDevice(DeviceId deviceId) {
647 NodeId master = mastershipService.getMasterFor(deviceId);
648 // if there exist a master, forward
649 // if there is no master, try to become one and process
650 boolean relinquishAtEnd = false;
651 if (master == null) {
652 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
653 if (myRole != MastershipRole.NONE) {
654 relinquishAtEnd = true;
655 }
656 log.debug("Temporarily requesting role for {} to remove", deviceId);
657 MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
658 if (role == MastershipRole.MASTER) {
659 master = localNodeId;
660 }
661 }
662
663 if (!localNodeId.equals(master)) {
664 log.debug("{} has control of {}, forwarding remove request",
665 master, deviceId);
666
667 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
668 .whenComplete((r, e) -> {
669 if (e != null) {
670 log.error("Failed to forward {} remove request to its master", deviceId, e);
671 }
672 });
673 return null;
674 }
675
676 // I have control..
677 DeviceEvent event = null;
678 final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
679 DeviceDescription removedDeviceDescription =
680 deviceDescriptions.remove(deviceKey);
681 if (removedDeviceDescription != null) {
682 event = purgeDeviceCache(deviceId);
683 }
684
685 if (relinquishAtEnd) {
686 log.debug("Relinquishing temporary role acquired for {}", deviceId);
687 mastershipService.relinquishMastership(deviceId);
688 }
689 return event;
690 }
691
Madan Jampanifc8aced2015-08-27 11:06:12 -0700692 private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
693 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
694 DeviceDescription primaryDeviceDescription =
695 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
696 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
697 annotations = merge(annotations, primaryDeviceDescription.annotations());
698 for (ProviderId providerId : getAllProviders(deviceId)) {
699 if (!providerId.equals(primaryProviderId)) {
700 annotations = merge(annotations,
701 deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
702 }
703 }
704 return annotations;
705 }
706
707 private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
708 @Override
709 public void event(SetEvent<DeviceId> event) {
710 final DeviceId deviceId = event.entry();
711 final Device device = devices.get(deviceId);
712 if (device != null) {
713 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
714 } else {
715 pendingAvailableChangeUpdates.add(deviceId);
716 }
717 }
718 }
719
720 private class InternalDeviceChangeEventListener
721 implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
722 @Override
723 public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
724 DeviceId deviceId = event.key().deviceId();
725 ProviderId providerId = event.key().providerId();
726 if (event.type() == PUT) {
727 notifyDelegate(refreshDeviceCache(providerId, deviceId));
728 if (pendingAvailableChangeUpdates.remove(deviceId)) {
729 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
730 }
731 } else if (event.type() == REMOVE) {
732 notifyDelegate(purgeDeviceCache(deviceId));
733 }
734 }
735 }
736
737 private class InternalPortChangeEventListener
738 implements EventuallyConsistentMapListener<PortKey, PortDescription> {
739 @Override
740 public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
741 DeviceId deviceId = event.key().deviceId();
742 ProviderId providerId = event.key().providerId();
743 PortNumber portNumber = event.key().portNumber();
744 if (event.type() == PUT) {
745 if (devices.containsKey(deviceId)) {
746 List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
747 for (DeviceEvent deviceEvent : events) {
748 notifyDelegate(deviceEvent);
749 }
750 }
751 } else if (event.type() == REMOVE) {
752 log.warn("Unexpected port removed event");
753 }
754 }
755 }
756
757 private class InternalPortStatsListener
758 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
759 @Override
760 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
761 if (event.type() == PUT) {
762 Device device = devices.get(event.key());
763 if (device != null) {
Thomas Vachuskad4955ae2016-08-23 14:56:37 -0700764 notifyDelegate(new DeviceEvent(PORT_STATS_UPDATED, device));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700765 }
766 }
767 }
768 }
Ray Milkey2bf5ea72017-06-01 09:03:34 -0700769}