blob: 178bdcf0531f9de92396b14ece7b358828a89b01 [file] [log] [blame]
Madan Jampanifc8aced2015-08-27 11:06:12 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Madan Jampanifc8aced2015-08-27 11:06:12 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.device.impl;
17
Ray Milkeyd84f89b2018-08-17 14:54:17 -070018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import com.google.common.util.concurrent.Futures;
Madan Jampanifc8aced2015-08-27 11:06:12 -070024import org.onlab.packet.ChassisId;
25import org.onlab.util.KryoNamespace;
Madan Jampanifc8aced2015-08-27 11:06:12 -070026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.mastership.MastershipService;
29import org.onosproject.mastership.MastershipTermService;
30import org.onosproject.net.Annotations;
31import org.onosproject.net.AnnotationsUtil;
32import org.onosproject.net.DefaultAnnotations;
33import org.onosproject.net.DefaultDevice;
34import org.onosproject.net.DefaultPort;
35import org.onosproject.net.Device;
Jian Li11599162016-01-15 15:46:16 -080036import org.onosproject.net.Device.Type;
Madan Jampanifc8aced2015-08-27 11:06:12 -070037import org.onosproject.net.DeviceId;
38import org.onosproject.net.MastershipRole;
Madan Jampanifc8aced2015-08-27 11:06:12 -070039import org.onosproject.net.Port;
40import org.onosproject.net.PortNumber;
Madan Jampanifc8aced2015-08-27 11:06:12 -070041import org.onosproject.net.device.DefaultPortStatistics;
42import org.onosproject.net.device.DeviceClockService;
43import org.onosproject.net.device.DeviceDescription;
44import org.onosproject.net.device.DeviceEvent;
45import org.onosproject.net.device.DeviceStore;
46import org.onosproject.net.device.DeviceStoreDelegate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070047import org.onosproject.net.device.PortDescription;
48import org.onosproject.net.device.PortStatistics;
49import org.onosproject.net.provider.ProviderId;
50import org.onosproject.store.AbstractStore;
51import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
52import org.onosproject.store.impl.MastershipBasedTimestamp;
53import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampanifc8aced2015-08-27 11:06:12 -070054import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
55import org.onosproject.store.service.DistributedSet;
56import org.onosproject.store.service.EventuallyConsistentMap;
57import org.onosproject.store.service.EventuallyConsistentMapEvent;
Jian Li11599162016-01-15 15:46:16 -080058import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070059import org.onosproject.store.service.Serializer;
60import org.onosproject.store.service.SetEvent;
61import org.onosproject.store.service.SetEventListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070062import org.onosproject.store.service.StorageService;
Jian Li11599162016-01-15 15:46:16 -080063import org.onosproject.store.service.WallClockTimestamp;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070064import org.osgi.service.component.annotations.Activate;
65import org.osgi.service.component.annotations.Deactivate;
66import org.osgi.service.component.annotations.Reference;
67import org.osgi.service.component.annotations.ReferenceCardinality;
Madan Jampanifc8aced2015-08-27 11:06:12 -070068import org.slf4j.Logger;
69
Ray Milkeyd84f89b2018-08-17 14:54:17 -070070import java.util.Collection;
71import java.util.Collections;
72import java.util.List;
73import java.util.Map;
74import java.util.Map.Entry;
75import java.util.Objects;
76import java.util.Optional;
77import java.util.Set;
78import java.util.concurrent.TimeUnit;
79import java.util.concurrent.atomic.AtomicReference;
80import java.util.stream.Collectors;
81import java.util.stream.Stream;
Jian Li11599162016-01-15 15:46:16 -080082
83import static com.google.common.base.Preconditions.checkArgument;
84import static com.google.common.base.Verify.verify;
85import static org.onosproject.net.DefaultAnnotations.merge;
86import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
87import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
88import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
89import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
90import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
91import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
92import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
Madan Jampanifc8aced2015-08-27 11:06:12 -070093import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
Jian Li11599162016-01-15 15:46:16 -080094import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
95import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
96import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanifc8aced2015-08-27 11:06:12 -070097
98/**
99 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
100 */
Jian Li11599162016-01-15 15:46:16 -0800101//@Component(immediate = true, enabled = false)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700102public class ECDeviceStore
103 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
104 implements DeviceStore {
105
106 private final Logger log = getLogger(getClass());
107
108 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
109
110 private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
111 private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
112 Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
113
114 private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
115 private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
116 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
117 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
118
119 private DistributedSet<DeviceId> availableDevices;
120
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700122 protected StorageService storageService;
123
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700125 protected MastershipService mastershipService;
126
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700128 protected MastershipTermService mastershipTermService;
129
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700131 protected DeviceClockService deviceClockService;
132
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700134 protected ClusterCommunicationService clusterCommunicator;
135
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700137 protected ClusterService clusterService;
138
139 private NodeId localNodeId;
140 private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
141 new InternalDeviceChangeEventListener();
142 private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
143 new InternalPortChangeEventListener();
144 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
145 new InternalPortStatsListener();
146 private final SetEventListener<DeviceId> deviceStatusTracker =
147 new InternalDeviceStatusTracker();
148
Jordan Halterman2c83a102017-08-20 17:11:41 -0700149 protected static final Serializer SERIALIZER = Serializer.using(
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700150 KryoNamespace.newBuilder()
Madan Jampanifc8aced2015-08-27 11:06:12 -0700151 .register(DistributedStoreSerializers.STORE_COMMON)
152 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700153 .build("ECDevice"));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700154
155 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
156 .register(KryoNamespaces.API)
157 .register(DeviceKey.class)
158 .register(PortKey.class)
159 .register(DeviceKey.class)
160 .register(PortKey.class)
161 .register(MastershipBasedTimestamp.class);
162
163 @Activate
164 public void activate() {
165 localNodeId = clusterService.getLocalNode().id();
166
167 deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
168 .withName("onos-device-descriptions")
169 .withSerializer(SERIALIZER_BUILDER)
170 .withTimestampProvider((k, v) -> {
171 try {
172 return deviceClockService.getTimestamp(k.deviceId());
173 } catch (IllegalStateException e) {
174 return null;
175 }
176 }).build();
177
178 portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
179 .withName("onos-port-descriptions")
180 .withSerializer(SERIALIZER_BUILDER)
181 .withTimestampProvider((k, v) -> {
182 try {
183 return deviceClockService.getTimestamp(k.deviceId());
184 } catch (IllegalStateException e) {
185 return null;
186 }
187 }).build();
188
189 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
190 .withName("onos-port-stats")
191 .withSerializer(SERIALIZER_BUILDER)
192 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
193 .withTimestampProvider((k, v) -> new WallClockTimestamp())
194 .withTombstonesDisabled()
195 .build();
196
197 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
198 eventuallyConsistentMapBuilder()
199 .withName("onos-port-stats-delta")
200 .withSerializer(SERIALIZER_BUILDER)
201 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
202 .withTimestampProvider((k, v) -> new WallClockTimestamp())
203 .withTombstonesDisabled()
204 .build();
205
Madan Jampanifc8aced2015-08-27 11:06:12 -0700206 availableDevices = storageService.<DeviceId>setBuilder()
207 .withName("onos-online-devices")
208 .withSerializer(Serializer.using(KryoNamespaces.API))
Madan Jampanifc8aced2015-08-27 11:06:12 -0700209 .withRelaxedReadConsistency()
Madan Jampani538be742016-02-10 14:55:38 -0800210 .build()
211 .asDistributedSet();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700212
213 deviceDescriptions.addListener(deviceUpdateListener);
214 portDescriptions.addListener(portUpdateListener);
215 devicePortStats.addListener(portStatsListener);
216 availableDevices.addListener(deviceStatusTracker);
217 log.info("Started");
218 }
219
220 @Deactivate
221 public void deactivate() {
222 devicePortStats.removeListener(portStatsListener);
223 deviceDescriptions.removeListener(deviceUpdateListener);
224 portDescriptions.removeListener(portUpdateListener);
225 availableDevices.removeListener(deviceStatusTracker);
226 devicePortStats.destroy();
227 devicePortDeltaStats.destroy();
228 deviceDescriptions.destroy();
229 portDescriptions.destroy();
230 devices.clear();
231 devicePorts.clear();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700232 log.info("Stopped");
233 }
234
235 @Override
236 public Iterable<Device> getDevices() {
237 return devices.values();
238 }
239
240 @Override
241 public int getDeviceCount() {
242 return devices.size();
243 }
244
245 @Override
mskala0d0c6832017-07-12 11:21:23 +0200246 public int getAvailableDeviceCount() {
247 return availableDevices.size();
248 }
249
250 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700251 public Device getDevice(DeviceId deviceId) {
252 return devices.get(deviceId);
253 }
254
helenyrwufd296b62016-06-22 17:43:02 -0700255 // FIXME handle deviceDescription.isDefaultAvailable()=false case properly.
Madan Jampanifc8aced2015-08-27 11:06:12 -0700256 @Override
257 public DeviceEvent createOrUpdateDevice(ProviderId providerId,
258 DeviceId deviceId,
259 DeviceDescription deviceDescription) {
260 NodeId master = mastershipService.getMasterFor(deviceId);
261 if (localNodeId.equals(master)) {
262 deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
263 return refreshDeviceCache(providerId, deviceId);
264 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800265 return null;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700266 }
267 }
268
269 private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
270 AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
271 Device device = devices.compute(deviceId, (k, existingDevice) -> {
272 Device newDevice = composeDevice(deviceId);
273 if (existingDevice == null) {
274 eventType.set(DEVICE_ADDED);
275 } else {
276 // We allow only certain attributes to trigger update
277 boolean propertiesChanged =
278 !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
279 !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
280 !Objects.equals(existingDevice.providerId(), newDevice.providerId());
281 boolean annotationsChanged =
282 !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
283
284 // Primary providers can respond to all changes, but ancillary ones
285 // should respond only to annotation changes.
286 if ((providerId.isAncillary() && annotationsChanged) ||
287 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
288 boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
289 verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
290 providerId, existingDevice, devices.get(deviceId), newDevice);
291 eventType.set(DEVICE_UPDATED);
292 }
293 }
294 return newDevice;
295 });
296 if (eventType.get() != null && !providerId.isAncillary()) {
297 markOnline(deviceId);
298 }
299 return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
300 }
301
302 /**
303 * Returns the primary providerId for a device.
304 * @param deviceId device identifier
305 * @return primary providerId
306 */
307 private Set<ProviderId> getAllProviders(DeviceId deviceId) {
308 return deviceDescriptions.keySet()
309 .stream()
310 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
311 .map(deviceKey -> deviceKey.providerId())
312 .collect(Collectors.toSet());
313 }
314
315 /**
316 * Returns the identifier for all providers for a device.
317 * @param deviceId device identifier
318 * @return set of provider identifiers
319 */
320 private ProviderId getPrimaryProviderId(DeviceId deviceId) {
321 Set<ProviderId> allProviderIds = getAllProviders(deviceId);
322 return allProviderIds.stream()
323 .filter(p -> !p.isAncillary())
324 .findFirst()
325 .orElse(Iterables.getFirst(allProviderIds, null));
326 }
327
328 /**
329 * Returns a Device, merging descriptions from multiple Providers.
330 *
331 * @param deviceId device identifier
332 * @return Device instance
333 */
334 private Device composeDevice(DeviceId deviceId) {
335
336 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
337 DeviceDescription primaryDeviceDescription =
338 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
339
340 Type type = primaryDeviceDescription.type();
341 String manufacturer = primaryDeviceDescription.manufacturer();
342 String hwVersion = primaryDeviceDescription.hwVersion();
343 String swVersion = primaryDeviceDescription.swVersion();
344 String serialNumber = primaryDeviceDescription.serialNumber();
345 ChassisId chassisId = primaryDeviceDescription.chassisId();
346 DefaultAnnotations annotations = mergeAnnotations(deviceId);
347
348 return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
349 hwVersion, swVersion, serialNumber,
350 chassisId, annotations);
351 }
352
353 private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
354 Device removedDevice = devices.remove(deviceId);
355 if (removedDevice != null) {
356 getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
357 return new DeviceEvent(DEVICE_REMOVED, removedDevice);
358 }
359 return null;
360 }
361
helenyrwufd296b62016-06-22 17:43:02 -0700362 // FIXME publicization of markOnline -- trigger some action independently?
Palash Kala6c526062018-04-03 18:25:11 +0900363 public DeviceEvent markOnline(DeviceId deviceId) {
helenyrwufd296b62016-06-22 17:43:02 -0700364 if (devices.containsKey(deviceId)) {
Palash Kala6c526062018-04-03 18:25:11 +0900365 if (availableDevices.add(deviceId)) {
366 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId), null);
367 }
helenyrwufd296b62016-06-22 17:43:02 -0700368 }
369 log.warn("Device {} does not exist in store", deviceId);
Palash Kala6c526062018-04-03 18:25:11 +0900370 return null;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700371 }
372
373 @Override
374 public DeviceEvent markOffline(DeviceId deviceId) {
375 availableDevices.remove(deviceId);
376 // set update listener will raise the event.
377 return null;
378 }
379
380 @Override
381 public List<DeviceEvent> updatePorts(ProviderId providerId,
382 DeviceId deviceId,
383 List<PortDescription> descriptions) {
384 NodeId master = mastershipService.getMasterFor(deviceId);
385 List<DeviceEvent> deviceEvents = null;
386 if (localNodeId.equals(master)) {
387 descriptions.forEach(description -> {
388 PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
389 portDescriptions.put(portKey, description);
390 });
391 deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
392 } else {
Ray Milkey2bf5ea72017-06-01 09:03:34 -0700393 return Collections.emptyList();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700394 }
395 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
396 }
397
398 private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
399 DeviceId deviceId,
400 Optional<PortNumber> portNumber) {
401 Device device = devices.get(deviceId);
402 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
403 List<DeviceEvent> events = Lists.newArrayList();
404
405 Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
406 List<PortDescription> descriptions = Lists.newArrayList();
407 portDescriptions.entrySet().forEach(e -> {
408 PortKey key = e.getKey();
409 PortDescription value = e.getValue();
410 if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
411 if (portNumber.isPresent()) {
412 if (portNumber.get().equals(key.portNumber())) {
413 descriptions.add(value);
414 }
415 } else {
416 descriptions.add(value);
417 }
418 }
419 });
420
421 for (PortDescription description : descriptions) {
422 final PortNumber number = description.portNumber();
423 ports.compute(number, (k, existingPort) -> {
424 Port newPort = composePort(device, number);
425 if (existingPort == null) {
426 events.add(new DeviceEvent(PORT_ADDED, device, newPort));
427 } else {
428 if (existingPort.isEnabled() != newPort.isEnabled() ||
429 existingPort.type() != newPort.type() ||
430 existingPort.portSpeed() != newPort.portSpeed() ||
431 !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
432 events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
433 }
434 }
435 return newPort;
436 });
437 }
438
439 return events;
440 }
441
442 /**
443 * Returns a Port, merging descriptions from multiple Providers.
444 *
445 * @param device device the port is on
446 * @param number port number
447 * @return Port instance
448 */
449 private Port composePort(Device device, PortNumber number) {
450
451 Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
452 portDescriptions.entrySet().forEach(entry -> {
453 PortKey portKey = entry.getKey();
454 if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
455 descriptions.put(portKey.providerId(), entry.getValue());
456 }
457 });
458 ProviderId primary = getPrimaryProviderId(device.id());
459 PortDescription primaryDescription = descriptions.get(primary);
460
461 // if no primary, assume not enabled
462 boolean isEnabled = false;
463 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
464 if (primaryDescription != null) {
465 isEnabled = primaryDescription.isEnabled();
466 annotations = merge(annotations, primaryDescription.annotations());
467 }
468 Port updated = null;
469 for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
470 if (e.getKey().equals(primary)) {
471 continue;
472 }
473 annotations = merge(annotations, e.getValue().annotations());
474 updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
475 }
476 if (primaryDescription == null) {
477 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
478 }
479 return updated == null
480 ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
481 : updated;
482 }
483
484 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
485 PortDescription description, Annotations annotations) {
Madan Jampanifc8aced2015-08-27 11:06:12 -0700486 return new DefaultPort(device, number, isEnabled, description.type(),
487 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700488 }
489
490 @Override
491 public DeviceEvent updatePortStatus(ProviderId providerId,
492 DeviceId deviceId,
493 PortDescription portDescription) {
494 portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
495 List<DeviceEvent> events =
496 refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
497 return Iterables.getFirst(events, null);
498 }
499
500 @Override
501 public List<Port> getPorts(DeviceId deviceId) {
502 return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
503 }
504
505 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700506 public Stream<PortDescription> getPortDescriptions(ProviderId pid,
507 DeviceId deviceId) {
508
509 return portDescriptions.entrySet().stream()
510 .filter(e -> e.getKey().providerId().equals(pid))
511 .map(Map.Entry::getValue);
512 }
513
514 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700515 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
516 return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
517 }
518
519 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700520 public PortDescription getPortDescription(ProviderId pid,
521 DeviceId deviceId,
522 PortNumber portNumber) {
523 return portDescriptions.get(new PortKey(pid, deviceId, portNumber));
524 }
525
526 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700527 public DeviceEvent updatePortStatistics(ProviderId providerId,
528 DeviceId deviceId,
529 Collection<PortStatistics> newStatsCollection) {
530
531 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
532 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
533 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
534
535 if (prvStatsMap != null) {
536 for (PortStatistics newStats : newStatsCollection) {
537 PortNumber port = PortNumber.portNumber(newStats.port());
538 PortStatistics prvStats = prvStatsMap.get(port);
539 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
540 PortStatistics deltaStats = builder.build();
541 if (prvStats != null) {
542 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
543 }
544 deltaStatsMap.put(port, deltaStats);
545 newStatsMap.put(port, newStats);
546 }
547 } else {
548 for (PortStatistics newStats : newStatsCollection) {
549 PortNumber port = PortNumber.portNumber(newStats.port());
550 newStatsMap.put(port, newStats);
551 }
552 }
553 devicePortDeltaStats.put(deviceId, deltaStatsMap);
554 devicePortStats.put(deviceId, newStatsMap);
555 // DeviceEvent returns null because of InternalPortStatsListener usage
556 return null;
557 }
558
559 /**
560 * Calculate delta statistics by subtracting previous from new statistics.
561 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700562 * @param deviceId device indentifier
563 * @param prvStats previous port statistics
564 * @param newStats new port statistics
Madan Jampanifc8aced2015-08-27 11:06:12 -0700565 * @return PortStatistics
566 */
567 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
568 // calculate time difference
569 long deltaStatsSec, deltaStatsNano;
570 if (newStats.durationNano() < prvStats.durationNano()) {
571 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
572 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
573 } else {
574 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
575 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
576 }
577 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
578 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
579 .setPort(newStats.port())
580 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
581 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
582 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
583 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
584 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
585 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
586 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
587 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
588 .setDurationSec(deltaStatsSec)
589 .setDurationNano(deltaStatsNano)
590 .build();
591 return deltaStats;
592 }
593
594 @Override
595 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
596 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
597 if (portStats == null) {
598 return Collections.emptyList();
599 }
600 return ImmutableList.copyOf(portStats.values());
601 }
602
603 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530604 public PortStatistics getStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
605 Map<PortNumber, PortStatistics> portStatsMap = devicePortStats.get(deviceId);
606 if (portStatsMap == null) {
607 return null;
608 }
609 PortStatistics portStats = portStatsMap.get(portNumber);
610 return portStats;
611 }
612
613 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700614 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
615 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
616 if (portStats == null) {
617 return Collections.emptyList();
618 }
619 return ImmutableList.copyOf(portStats.values());
620 }
621
622 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530623 public PortStatistics getDeltaStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
624 Map<PortNumber, PortStatistics> portStatsMap = devicePortDeltaStats.get(deviceId);
625 if (portStatsMap == null) {
626 return null;
627 }
628 PortStatistics portStats = portStatsMap.get(portNumber);
629 return portStats;
630 }
631
632 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700633 public boolean isAvailable(DeviceId deviceId) {
634 return availableDevices.contains(deviceId);
635 }
636
637 @Override
638 public Iterable<Device> getAvailableDevices() {
639 return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
640 }
641
642 @Override
643 public DeviceEvent removeDevice(DeviceId deviceId) {
644 NodeId master = mastershipService.getMasterFor(deviceId);
645 // if there exist a master, forward
646 // if there is no master, try to become one and process
647 boolean relinquishAtEnd = false;
648 if (master == null) {
649 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
650 if (myRole != MastershipRole.NONE) {
651 relinquishAtEnd = true;
652 }
653 log.debug("Temporarily requesting role for {} to remove", deviceId);
654 MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
655 if (role == MastershipRole.MASTER) {
656 master = localNodeId;
657 }
658 }
659
660 if (!localNodeId.equals(master)) {
661 log.debug("{} has control of {}, forwarding remove request",
662 master, deviceId);
663
664 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
665 .whenComplete((r, e) -> {
666 if (e != null) {
667 log.error("Failed to forward {} remove request to its master", deviceId, e);
668 }
669 });
670 return null;
671 }
672
673 // I have control..
674 DeviceEvent event = null;
675 final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
676 DeviceDescription removedDeviceDescription =
677 deviceDescriptions.remove(deviceKey);
678 if (removedDeviceDescription != null) {
679 event = purgeDeviceCache(deviceId);
680 }
681
682 if (relinquishAtEnd) {
683 log.debug("Relinquishing temporary role acquired for {}", deviceId);
684 mastershipService.relinquishMastership(deviceId);
685 }
686 return event;
687 }
688
Madan Jampanifc8aced2015-08-27 11:06:12 -0700689 private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
690 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
691 DeviceDescription primaryDeviceDescription =
692 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
693 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
694 annotations = merge(annotations, primaryDeviceDescription.annotations());
695 for (ProviderId providerId : getAllProviders(deviceId)) {
696 if (!providerId.equals(primaryProviderId)) {
697 annotations = merge(annotations,
698 deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
699 }
700 }
701 return annotations;
702 }
703
704 private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
705 @Override
706 public void event(SetEvent<DeviceId> event) {
707 final DeviceId deviceId = event.entry();
708 final Device device = devices.get(deviceId);
709 if (device != null) {
710 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
711 } else {
712 pendingAvailableChangeUpdates.add(deviceId);
713 }
714 }
715 }
716
717 private class InternalDeviceChangeEventListener
718 implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
719 @Override
720 public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
721 DeviceId deviceId = event.key().deviceId();
722 ProviderId providerId = event.key().providerId();
723 if (event.type() == PUT) {
724 notifyDelegate(refreshDeviceCache(providerId, deviceId));
725 if (pendingAvailableChangeUpdates.remove(deviceId)) {
726 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
727 }
728 } else if (event.type() == REMOVE) {
729 notifyDelegate(purgeDeviceCache(deviceId));
730 }
731 }
732 }
733
734 private class InternalPortChangeEventListener
735 implements EventuallyConsistentMapListener<PortKey, PortDescription> {
736 @Override
737 public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
738 DeviceId deviceId = event.key().deviceId();
739 ProviderId providerId = event.key().providerId();
740 PortNumber portNumber = event.key().portNumber();
741 if (event.type() == PUT) {
742 if (devices.containsKey(deviceId)) {
743 List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
744 for (DeviceEvent deviceEvent : events) {
745 notifyDelegate(deviceEvent);
746 }
747 }
748 } else if (event.type() == REMOVE) {
749 log.warn("Unexpected port removed event");
750 }
751 }
752 }
753
754 private class InternalPortStatsListener
755 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
756 @Override
757 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
758 if (event.type() == PUT) {
759 Device device = devices.get(event.key());
760 if (device != null) {
Thomas Vachuskad4955ae2016-08-23 14:56:37 -0700761 notifyDelegate(new DeviceEvent(PORT_STATS_UPDATED, device));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700762 }
763 }
764 }
765 }
Ray Milkey2bf5ea72017-06-01 09:03:34 -0700766}