blob: d34a6470aa8653d93ed304d87e83dce2937a37a4 [file] [log] [blame]
Madan Jampanifc8aced2015-08-27 11:06:12 -07001/*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.device.impl;
17
18import static com.google.common.base.Preconditions.checkArgument;
19import static com.google.common.base.Verify.verify;
20import static org.onosproject.net.DefaultAnnotations.merge;
21import static org.slf4j.LoggerFactory.getLogger;
22
23import java.util.Collection;
24import java.util.Collections;
25import java.util.List;
26import java.util.Map;
27import java.util.Objects;
28import java.util.Optional;
29import java.util.Set;
30import java.util.Map.Entry;
31import java.util.concurrent.TimeUnit;
32import java.util.concurrent.atomic.AtomicReference;
33import java.util.stream.Collectors;
34
35import org.apache.felix.scr.annotations.Activate;
36import org.apache.felix.scr.annotations.Component;
37import org.apache.felix.scr.annotations.Deactivate;
38import org.apache.felix.scr.annotations.Reference;
39import org.apache.felix.scr.annotations.ReferenceCardinality;
40import org.apache.felix.scr.annotations.Service;
41import org.onlab.packet.ChassisId;
42import org.onlab.util.KryoNamespace;
43import org.onlab.util.SharedExecutors;
44import org.onosproject.cluster.ClusterService;
45import org.onosproject.cluster.NodeId;
46import org.onosproject.mastership.MastershipService;
47import org.onosproject.mastership.MastershipTermService;
48import org.onosproject.net.Annotations;
49import org.onosproject.net.AnnotationsUtil;
50import org.onosproject.net.DefaultAnnotations;
51import org.onosproject.net.DefaultDevice;
52import org.onosproject.net.DefaultPort;
53import org.onosproject.net.Device;
54import org.onosproject.net.DeviceId;
55import org.onosproject.net.MastershipRole;
56import org.onosproject.net.OchPort;
57import org.onosproject.net.OduCltPort;
58import org.onosproject.net.OmsPort;
59import org.onosproject.net.Port;
60import org.onosproject.net.PortNumber;
61import org.onosproject.net.Device.Type;
62import org.onosproject.net.device.DefaultPortStatistics;
63import org.onosproject.net.device.DeviceClockService;
64import org.onosproject.net.device.DeviceDescription;
65import org.onosproject.net.device.DeviceEvent;
66import org.onosproject.net.device.DeviceStore;
67import org.onosproject.net.device.DeviceStoreDelegate;
68import org.onosproject.net.device.OchPortDescription;
69import org.onosproject.net.device.OduCltPortDescription;
70import org.onosproject.net.device.OmsPortDescription;
71import org.onosproject.net.device.PortDescription;
72import org.onosproject.net.device.PortStatistics;
73import org.onosproject.net.provider.ProviderId;
74import org.onosproject.store.AbstractStore;
75import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
76import org.onosproject.store.impl.MastershipBasedTimestamp;
77import org.onosproject.store.serializers.KryoNamespaces;
78import org.onosproject.store.serializers.KryoSerializer;
79import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
80import org.onosproject.store.service.DistributedSet;
81import org.onosproject.store.service.EventuallyConsistentMap;
82import org.onosproject.store.service.EventuallyConsistentMapEvent;
83import org.onosproject.store.service.Serializer;
84import org.onosproject.store.service.SetEvent;
85import org.onosproject.store.service.SetEventListener;
86import org.onosproject.store.service.WallClockTimestamp;
87
88import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
89import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
90
91import org.onosproject.store.service.EventuallyConsistentMapListener;
92import org.onosproject.store.service.StorageService;
93import org.slf4j.Logger;
94
95import static org.onosproject.net.device.DeviceEvent.Type.*;
96import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
97import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
98import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
99
100import com.google.common.collect.ImmutableList;
101import com.google.common.collect.Iterables;
102import com.google.common.collect.Lists;
103import com.google.common.collect.Maps;
104import com.google.common.collect.Sets;
105import com.google.common.util.concurrent.Futures;
106
107/**
108 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
109 */
110@Component(immediate = true, enabled = false)
111@Service
112public class ECDeviceStore
113 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
114 implements DeviceStore {
115
116 private final Logger log = getLogger(getClass());
117
118 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
119
120 private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
121 private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
122 Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
123
124 private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
125 private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
126 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
127 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
128
129 private DistributedSet<DeviceId> availableDevices;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected StorageService storageService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected MastershipService mastershipService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected MastershipTermService mastershipTermService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected DeviceClockService deviceClockService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 protected ClusterCommunicationService clusterCommunicator;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
147 protected ClusterService clusterService;
148
149 private NodeId localNodeId;
150 private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
151 new InternalDeviceChangeEventListener();
152 private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
153 new InternalPortChangeEventListener();
154 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
155 new InternalPortStatsListener();
156 private final SetEventListener<DeviceId> deviceStatusTracker =
157 new InternalDeviceStatusTracker();
158
159 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
160 @Override
161 protected void setupKryoPool() {
162 serializerPool = KryoNamespace.newBuilder()
163 .register(DistributedStoreSerializers.STORE_COMMON)
164 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
165 .register(DeviceInjectedEvent.class)
166 .register(PortInjectedEvent.class)
167 .build();
168 }
169 };
170
171 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
172 .register(KryoNamespaces.API)
173 .register(DeviceKey.class)
174 .register(PortKey.class)
175 .register(DeviceKey.class)
176 .register(PortKey.class)
177 .register(MastershipBasedTimestamp.class);
178
179 @Activate
180 public void activate() {
181 localNodeId = clusterService.getLocalNode().id();
182
183 deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
184 .withName("onos-device-descriptions")
185 .withSerializer(SERIALIZER_BUILDER)
186 .withTimestampProvider((k, v) -> {
187 try {
188 return deviceClockService.getTimestamp(k.deviceId());
189 } catch (IllegalStateException e) {
190 return null;
191 }
192 }).build();
193
194 portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
195 .withName("onos-port-descriptions")
196 .withSerializer(SERIALIZER_BUILDER)
197 .withTimestampProvider((k, v) -> {
198 try {
199 return deviceClockService.getTimestamp(k.deviceId());
200 } catch (IllegalStateException e) {
201 return null;
202 }
203 }).build();
204
205 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
206 .withName("onos-port-stats")
207 .withSerializer(SERIALIZER_BUILDER)
208 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
209 .withTimestampProvider((k, v) -> new WallClockTimestamp())
210 .withTombstonesDisabled()
211 .build();
212
213 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
214 eventuallyConsistentMapBuilder()
215 .withName("onos-port-stats-delta")
216 .withSerializer(SERIALIZER_BUILDER)
217 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
218 .withTimestampProvider((k, v) -> new WallClockTimestamp())
219 .withTombstonesDisabled()
220 .build();
221
222 clusterCommunicator.addSubscriber(DEVICE_INJECTED,
223 SERIALIZER::decode,
224 this::injectDevice,
225 SERIALIZER::encode,
226 SharedExecutors.getPoolThreadExecutor());
227
228 clusterCommunicator.addSubscriber(PORT_INJECTED,
229 SERIALIZER::decode,
230 this::injectPort,
231 SERIALIZER::encode,
232 SharedExecutors.getPoolThreadExecutor());
233
234 availableDevices = storageService.<DeviceId>setBuilder()
235 .withName("onos-online-devices")
236 .withSerializer(Serializer.using(KryoNamespaces.API))
237 .withPartitionsDisabled()
238 .withRelaxedReadConsistency()
239 .build();
240
241 deviceDescriptions.addListener(deviceUpdateListener);
242 portDescriptions.addListener(portUpdateListener);
243 devicePortStats.addListener(portStatsListener);
244 availableDevices.addListener(deviceStatusTracker);
245 log.info("Started");
246 }
247
248 @Deactivate
249 public void deactivate() {
250 devicePortStats.removeListener(portStatsListener);
251 deviceDescriptions.removeListener(deviceUpdateListener);
252 portDescriptions.removeListener(portUpdateListener);
253 availableDevices.removeListener(deviceStatusTracker);
254 devicePortStats.destroy();
255 devicePortDeltaStats.destroy();
256 deviceDescriptions.destroy();
257 portDescriptions.destroy();
258 devices.clear();
259 devicePorts.clear();
260 clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
261 clusterCommunicator.removeSubscriber(PORT_INJECTED);
262 log.info("Stopped");
263 }
264
265 @Override
266 public Iterable<Device> getDevices() {
267 return devices.values();
268 }
269
270 @Override
271 public int getDeviceCount() {
272 return devices.size();
273 }
274
275 @Override
276 public Device getDevice(DeviceId deviceId) {
277 return devices.get(deviceId);
278 }
279
280 @Override
281 public DeviceEvent createOrUpdateDevice(ProviderId providerId,
282 DeviceId deviceId,
283 DeviceDescription deviceDescription) {
284 NodeId master = mastershipService.getMasterFor(deviceId);
285 if (localNodeId.equals(master)) {
286 deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
287 return refreshDeviceCache(providerId, deviceId);
288 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800289 // Only forward for ConfigProvider
290 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800291 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800292 return null;
293 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700294 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
295 return Futures.getUnchecked(
296 clusterCommunicator.sendAndReceive(deviceInjectedEvent,
297 DEVICE_INJECTED,
298 SERIALIZER::encode,
299 SERIALIZER::decode,
300 master));
301 }
302 }
303
304 private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
305 AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
306 Device device = devices.compute(deviceId, (k, existingDevice) -> {
307 Device newDevice = composeDevice(deviceId);
308 if (existingDevice == null) {
309 eventType.set(DEVICE_ADDED);
310 } else {
311 // We allow only certain attributes to trigger update
312 boolean propertiesChanged =
313 !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
314 !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
315 !Objects.equals(existingDevice.providerId(), newDevice.providerId());
316 boolean annotationsChanged =
317 !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
318
319 // Primary providers can respond to all changes, but ancillary ones
320 // should respond only to annotation changes.
321 if ((providerId.isAncillary() && annotationsChanged) ||
322 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
323 boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
324 verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
325 providerId, existingDevice, devices.get(deviceId), newDevice);
326 eventType.set(DEVICE_UPDATED);
327 }
328 }
329 return newDevice;
330 });
331 if (eventType.get() != null && !providerId.isAncillary()) {
332 markOnline(deviceId);
333 }
334 return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
335 }
336
337 /**
338 * Returns the primary providerId for a device.
339 * @param deviceId device identifier
340 * @return primary providerId
341 */
342 private Set<ProviderId> getAllProviders(DeviceId deviceId) {
343 return deviceDescriptions.keySet()
344 .stream()
345 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
346 .map(deviceKey -> deviceKey.providerId())
347 .collect(Collectors.toSet());
348 }
349
350 /**
351 * Returns the identifier for all providers for a device.
352 * @param deviceId device identifier
353 * @return set of provider identifiers
354 */
355 private ProviderId getPrimaryProviderId(DeviceId deviceId) {
356 Set<ProviderId> allProviderIds = getAllProviders(deviceId);
357 return allProviderIds.stream()
358 .filter(p -> !p.isAncillary())
359 .findFirst()
360 .orElse(Iterables.getFirst(allProviderIds, null));
361 }
362
363 /**
364 * Returns a Device, merging descriptions from multiple Providers.
365 *
366 * @param deviceId device identifier
367 * @return Device instance
368 */
369 private Device composeDevice(DeviceId deviceId) {
370
371 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
372 DeviceDescription primaryDeviceDescription =
373 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
374
375 Type type = primaryDeviceDescription.type();
376 String manufacturer = primaryDeviceDescription.manufacturer();
377 String hwVersion = primaryDeviceDescription.hwVersion();
378 String swVersion = primaryDeviceDescription.swVersion();
379 String serialNumber = primaryDeviceDescription.serialNumber();
380 ChassisId chassisId = primaryDeviceDescription.chassisId();
381 DefaultAnnotations annotations = mergeAnnotations(deviceId);
382
383 return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
384 hwVersion, swVersion, serialNumber,
385 chassisId, annotations);
386 }
387
388 private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
389 Device removedDevice = devices.remove(deviceId);
390 if (removedDevice != null) {
391 getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
392 return new DeviceEvent(DEVICE_REMOVED, removedDevice);
393 }
394 return null;
395 }
396
397 private boolean markOnline(DeviceId deviceId) {
398 return availableDevices.add(deviceId);
399 }
400
401 @Override
402 public DeviceEvent markOffline(DeviceId deviceId) {
403 availableDevices.remove(deviceId);
404 // set update listener will raise the event.
405 return null;
406 }
407
408 @Override
409 public List<DeviceEvent> updatePorts(ProviderId providerId,
410 DeviceId deviceId,
411 List<PortDescription> descriptions) {
412 NodeId master = mastershipService.getMasterFor(deviceId);
413 List<DeviceEvent> deviceEvents = null;
414 if (localNodeId.equals(master)) {
415 descriptions.forEach(description -> {
416 PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
417 portDescriptions.put(portKey, description);
418 });
419 deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
420 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800421 // Only forward for ConfigProvider
422 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800423 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800424 return null;
425 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700426 if (master == null) {
427 return Collections.emptyList();
428 }
429 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
430 deviceEvents = Futures.getUnchecked(
431 clusterCommunicator.sendAndReceive(portInjectedEvent,
432 PORT_INJECTED,
433 SERIALIZER::encode,
434 SERIALIZER::decode,
435 master));
436 }
437 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
438 }
439
440 private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
441 DeviceId deviceId,
442 Optional<PortNumber> portNumber) {
443 Device device = devices.get(deviceId);
444 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
445 List<DeviceEvent> events = Lists.newArrayList();
446
447 Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
448 List<PortDescription> descriptions = Lists.newArrayList();
449 portDescriptions.entrySet().forEach(e -> {
450 PortKey key = e.getKey();
451 PortDescription value = e.getValue();
452 if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
453 if (portNumber.isPresent()) {
454 if (portNumber.get().equals(key.portNumber())) {
455 descriptions.add(value);
456 }
457 } else {
458 descriptions.add(value);
459 }
460 }
461 });
462
463 for (PortDescription description : descriptions) {
464 final PortNumber number = description.portNumber();
465 ports.compute(number, (k, existingPort) -> {
466 Port newPort = composePort(device, number);
467 if (existingPort == null) {
468 events.add(new DeviceEvent(PORT_ADDED, device, newPort));
469 } else {
470 if (existingPort.isEnabled() != newPort.isEnabled() ||
471 existingPort.type() != newPort.type() ||
472 existingPort.portSpeed() != newPort.portSpeed() ||
473 !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
474 events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
475 }
476 }
477 return newPort;
478 });
479 }
480
481 return events;
482 }
483
484 /**
485 * Returns a Port, merging descriptions from multiple Providers.
486 *
487 * @param device device the port is on
488 * @param number port number
489 * @return Port instance
490 */
491 private Port composePort(Device device, PortNumber number) {
492
493 Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
494 portDescriptions.entrySet().forEach(entry -> {
495 PortKey portKey = entry.getKey();
496 if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
497 descriptions.put(portKey.providerId(), entry.getValue());
498 }
499 });
500 ProviderId primary = getPrimaryProviderId(device.id());
501 PortDescription primaryDescription = descriptions.get(primary);
502
503 // if no primary, assume not enabled
504 boolean isEnabled = false;
505 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
506 if (primaryDescription != null) {
507 isEnabled = primaryDescription.isEnabled();
508 annotations = merge(annotations, primaryDescription.annotations());
509 }
510 Port updated = null;
511 for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
512 if (e.getKey().equals(primary)) {
513 continue;
514 }
515 annotations = merge(annotations, e.getValue().annotations());
516 updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
517 }
518 if (primaryDescription == null) {
519 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
520 }
521 return updated == null
522 ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
523 : updated;
524 }
525
526 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
527 PortDescription description, Annotations annotations) {
528 switch (description.type()) {
529 case OMS:
530 OmsPortDescription omsDesc = (OmsPortDescription) description;
531 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
532 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
533 case OCH:
534 OchPortDescription ochDesc = (OchPortDescription) description;
535 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
536 ochDesc.isTunable(), ochDesc.lambda(), annotations);
537 case ODUCLT:
538 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
539 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
540 default:
541 return new DefaultPort(device, number, isEnabled, description.type(),
542 description.portSpeed(), annotations);
543 }
544 }
545
546 @Override
547 public DeviceEvent updatePortStatus(ProviderId providerId,
548 DeviceId deviceId,
549 PortDescription portDescription) {
550 portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
551 List<DeviceEvent> events =
552 refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
553 return Iterables.getFirst(events, null);
554 }
555
556 @Override
557 public List<Port> getPorts(DeviceId deviceId) {
558 return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
559 }
560
561 @Override
562 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
563 return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
564 }
565
566 @Override
567 public DeviceEvent updatePortStatistics(ProviderId providerId,
568 DeviceId deviceId,
569 Collection<PortStatistics> newStatsCollection) {
570
571 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
572 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
573 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
574
575 if (prvStatsMap != null) {
576 for (PortStatistics newStats : newStatsCollection) {
577 PortNumber port = PortNumber.portNumber(newStats.port());
578 PortStatistics prvStats = prvStatsMap.get(port);
579 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
580 PortStatistics deltaStats = builder.build();
581 if (prvStats != null) {
582 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
583 }
584 deltaStatsMap.put(port, deltaStats);
585 newStatsMap.put(port, newStats);
586 }
587 } else {
588 for (PortStatistics newStats : newStatsCollection) {
589 PortNumber port = PortNumber.portNumber(newStats.port());
590 newStatsMap.put(port, newStats);
591 }
592 }
593 devicePortDeltaStats.put(deviceId, deltaStatsMap);
594 devicePortStats.put(deviceId, newStatsMap);
595 // DeviceEvent returns null because of InternalPortStatsListener usage
596 return null;
597 }
598
599 /**
600 * Calculate delta statistics by subtracting previous from new statistics.
601 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700602 * @param deviceId device indentifier
603 * @param prvStats previous port statistics
604 * @param newStats new port statistics
Madan Jampanifc8aced2015-08-27 11:06:12 -0700605 * @return PortStatistics
606 */
607 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
608 // calculate time difference
609 long deltaStatsSec, deltaStatsNano;
610 if (newStats.durationNano() < prvStats.durationNano()) {
611 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
612 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
613 } else {
614 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
615 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
616 }
617 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
618 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
619 .setPort(newStats.port())
620 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
621 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
622 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
623 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
624 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
625 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
626 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
627 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
628 .setDurationSec(deltaStatsSec)
629 .setDurationNano(deltaStatsNano)
630 .build();
631 return deltaStats;
632 }
633
634 @Override
635 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
636 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
637 if (portStats == null) {
638 return Collections.emptyList();
639 }
640 return ImmutableList.copyOf(portStats.values());
641 }
642
643 @Override
644 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
645 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
646 if (portStats == null) {
647 return Collections.emptyList();
648 }
649 return ImmutableList.copyOf(portStats.values());
650 }
651
652 @Override
653 public boolean isAvailable(DeviceId deviceId) {
654 return availableDevices.contains(deviceId);
655 }
656
657 @Override
658 public Iterable<Device> getAvailableDevices() {
659 return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
660 }
661
662 @Override
663 public DeviceEvent removeDevice(DeviceId deviceId) {
664 NodeId master = mastershipService.getMasterFor(deviceId);
665 // if there exist a master, forward
666 // if there is no master, try to become one and process
667 boolean relinquishAtEnd = false;
668 if (master == null) {
669 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
670 if (myRole != MastershipRole.NONE) {
671 relinquishAtEnd = true;
672 }
673 log.debug("Temporarily requesting role for {} to remove", deviceId);
674 MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
675 if (role == MastershipRole.MASTER) {
676 master = localNodeId;
677 }
678 }
679
680 if (!localNodeId.equals(master)) {
681 log.debug("{} has control of {}, forwarding remove request",
682 master, deviceId);
683
684 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
685 .whenComplete((r, e) -> {
686 if (e != null) {
687 log.error("Failed to forward {} remove request to its master", deviceId, e);
688 }
689 });
690 return null;
691 }
692
693 // I have control..
694 DeviceEvent event = null;
695 final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
696 DeviceDescription removedDeviceDescription =
697 deviceDescriptions.remove(deviceKey);
698 if (removedDeviceDescription != null) {
699 event = purgeDeviceCache(deviceId);
700 }
701
702 if (relinquishAtEnd) {
703 log.debug("Relinquishing temporary role acquired for {}", deviceId);
704 mastershipService.relinquishMastership(deviceId);
705 }
706 return event;
707 }
708
709 private DeviceEvent injectDevice(DeviceInjectedEvent event) {
710 return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
711 }
712
713 private List<DeviceEvent> injectPort(PortInjectedEvent event) {
714 return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
715 }
716
717 private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
718 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
719 DeviceDescription primaryDeviceDescription =
720 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
721 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
722 annotations = merge(annotations, primaryDeviceDescription.annotations());
723 for (ProviderId providerId : getAllProviders(deviceId)) {
724 if (!providerId.equals(primaryProviderId)) {
725 annotations = merge(annotations,
726 deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
727 }
728 }
729 return annotations;
730 }
731
732 private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
733 @Override
734 public void event(SetEvent<DeviceId> event) {
735 final DeviceId deviceId = event.entry();
736 final Device device = devices.get(deviceId);
737 if (device != null) {
738 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
739 } else {
740 pendingAvailableChangeUpdates.add(deviceId);
741 }
742 }
743 }
744
745 private class InternalDeviceChangeEventListener
746 implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
747 @Override
748 public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
749 DeviceId deviceId = event.key().deviceId();
750 ProviderId providerId = event.key().providerId();
751 if (event.type() == PUT) {
752 notifyDelegate(refreshDeviceCache(providerId, deviceId));
753 if (pendingAvailableChangeUpdates.remove(deviceId)) {
754 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
755 }
756 } else if (event.type() == REMOVE) {
757 notifyDelegate(purgeDeviceCache(deviceId));
758 }
759 }
760 }
761
762 private class InternalPortChangeEventListener
763 implements EventuallyConsistentMapListener<PortKey, PortDescription> {
764 @Override
765 public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
766 DeviceId deviceId = event.key().deviceId();
767 ProviderId providerId = event.key().providerId();
768 PortNumber portNumber = event.key().portNumber();
769 if (event.type() == PUT) {
770 if (devices.containsKey(deviceId)) {
771 List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
772 for (DeviceEvent deviceEvent : events) {
773 notifyDelegate(deviceEvent);
774 }
775 }
776 } else if (event.type() == REMOVE) {
777 log.warn("Unexpected port removed event");
778 }
779 }
780 }
781
782 private class InternalPortStatsListener
783 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
784 @Override
785 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
786 if (event.type() == PUT) {
787 Device device = devices.get(event.key());
788 if (device != null) {
789 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
790 }
791 }
792 }
793 }
794}