blob: a47df89488faf935fefbdc7356e4433b74b2841d [file] [log] [blame]
Madan Jampanifc8aced2015-08-27 11:06:12 -07001/*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.device.impl;
17
Jian Li11599162016-01-15 15:46:16 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import com.google.common.util.concurrent.Futures;
Madan Jampanifc8aced2015-08-27 11:06:12 -070024import org.apache.felix.scr.annotations.Activate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070025import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.KryoNamespace;
31import org.onlab.util.SharedExecutors;
32import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.mastership.MastershipService;
35import org.onosproject.mastership.MastershipTermService;
36import org.onosproject.net.Annotations;
37import org.onosproject.net.AnnotationsUtil;
38import org.onosproject.net.DefaultAnnotations;
39import org.onosproject.net.DefaultDevice;
40import org.onosproject.net.DefaultPort;
41import org.onosproject.net.Device;
Jian Li11599162016-01-15 15:46:16 -080042import org.onosproject.net.Device.Type;
Madan Jampanifc8aced2015-08-27 11:06:12 -070043import org.onosproject.net.DeviceId;
44import org.onosproject.net.MastershipRole;
45import org.onosproject.net.OchPort;
46import org.onosproject.net.OduCltPort;
47import org.onosproject.net.OmsPort;
48import org.onosproject.net.Port;
49import org.onosproject.net.PortNumber;
Madan Jampanifc8aced2015-08-27 11:06:12 -070050import org.onosproject.net.device.DefaultPortStatistics;
51import org.onosproject.net.device.DeviceClockService;
52import org.onosproject.net.device.DeviceDescription;
53import org.onosproject.net.device.DeviceEvent;
54import org.onosproject.net.device.DeviceStore;
55import org.onosproject.net.device.DeviceStoreDelegate;
56import org.onosproject.net.device.OchPortDescription;
57import org.onosproject.net.device.OduCltPortDescription;
58import org.onosproject.net.device.OmsPortDescription;
59import org.onosproject.net.device.PortDescription;
60import org.onosproject.net.device.PortStatistics;
61import org.onosproject.net.provider.ProviderId;
62import org.onosproject.store.AbstractStore;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.impl.MastershipBasedTimestamp;
65import org.onosproject.store.serializers.KryoNamespaces;
66import org.onosproject.store.serializers.KryoSerializer;
67import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
68import org.onosproject.store.service.DistributedSet;
69import org.onosproject.store.service.EventuallyConsistentMap;
70import org.onosproject.store.service.EventuallyConsistentMapEvent;
Jian Li11599162016-01-15 15:46:16 -080071import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070072import org.onosproject.store.service.Serializer;
73import org.onosproject.store.service.SetEvent;
74import org.onosproject.store.service.SetEventListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070075import org.onosproject.store.service.StorageService;
Jian Li11599162016-01-15 15:46:16 -080076import org.onosproject.store.service.WallClockTimestamp;
Madan Jampanifc8aced2015-08-27 11:06:12 -070077import org.slf4j.Logger;
78
Jian Li11599162016-01-15 15:46:16 -080079import java.util.Collection;
80import java.util.Collections;
81import java.util.List;
82import java.util.Map;
83import java.util.Map.Entry;
84import java.util.Objects;
85import java.util.Optional;
86import java.util.Set;
87import java.util.concurrent.TimeUnit;
88import java.util.concurrent.atomic.AtomicReference;
89import java.util.stream.Collectors;
90
91import static com.google.common.base.Preconditions.checkArgument;
92import static com.google.common.base.Verify.verify;
93import static org.onosproject.net.DefaultAnnotations.merge;
94import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
95import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
96import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
97import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
98import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
99import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
100import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700101import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
102import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
103import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
Jian Li11599162016-01-15 15:46:16 -0800104import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
105import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
106import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700107
108/**
109 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
110 */
Jian Li11599162016-01-15 15:46:16 -0800111//@Component(immediate = true, enabled = false)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700112@Service
113public class ECDeviceStore
114 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
115 implements DeviceStore {
116
117 private final Logger log = getLogger(getClass());
118
119 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
120
121 private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
122 private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
123 Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
124
125 private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
126 private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
127 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
128 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
129
130 private DistributedSet<DeviceId> availableDevices;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected StorageService storageService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected MastershipService mastershipService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected MastershipTermService mastershipTermService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected DeviceClockService deviceClockService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected ClusterCommunicationService clusterCommunicator;
146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected ClusterService clusterService;
149
150 private NodeId localNodeId;
151 private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
152 new InternalDeviceChangeEventListener();
153 private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
154 new InternalPortChangeEventListener();
155 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
156 new InternalPortStatsListener();
157 private final SetEventListener<DeviceId> deviceStatusTracker =
158 new InternalDeviceStatusTracker();
159
160 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
161 @Override
162 protected void setupKryoPool() {
163 serializerPool = KryoNamespace.newBuilder()
164 .register(DistributedStoreSerializers.STORE_COMMON)
165 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
166 .register(DeviceInjectedEvent.class)
167 .register(PortInjectedEvent.class)
168 .build();
169 }
170 };
171
172 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
173 .register(KryoNamespaces.API)
174 .register(DeviceKey.class)
175 .register(PortKey.class)
176 .register(DeviceKey.class)
177 .register(PortKey.class)
178 .register(MastershipBasedTimestamp.class);
179
180 @Activate
181 public void activate() {
182 localNodeId = clusterService.getLocalNode().id();
183
184 deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
185 .withName("onos-device-descriptions")
186 .withSerializer(SERIALIZER_BUILDER)
187 .withTimestampProvider((k, v) -> {
188 try {
189 return deviceClockService.getTimestamp(k.deviceId());
190 } catch (IllegalStateException e) {
191 return null;
192 }
193 }).build();
194
195 portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
196 .withName("onos-port-descriptions")
197 .withSerializer(SERIALIZER_BUILDER)
198 .withTimestampProvider((k, v) -> {
199 try {
200 return deviceClockService.getTimestamp(k.deviceId());
201 } catch (IllegalStateException e) {
202 return null;
203 }
204 }).build();
205
206 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
207 .withName("onos-port-stats")
208 .withSerializer(SERIALIZER_BUILDER)
209 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
210 .withTimestampProvider((k, v) -> new WallClockTimestamp())
211 .withTombstonesDisabled()
212 .build();
213
214 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
215 eventuallyConsistentMapBuilder()
216 .withName("onos-port-stats-delta")
217 .withSerializer(SERIALIZER_BUILDER)
218 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
219 .withTimestampProvider((k, v) -> new WallClockTimestamp())
220 .withTombstonesDisabled()
221 .build();
222
223 clusterCommunicator.addSubscriber(DEVICE_INJECTED,
224 SERIALIZER::decode,
225 this::injectDevice,
226 SERIALIZER::encode,
227 SharedExecutors.getPoolThreadExecutor());
228
229 clusterCommunicator.addSubscriber(PORT_INJECTED,
230 SERIALIZER::decode,
231 this::injectPort,
232 SERIALIZER::encode,
233 SharedExecutors.getPoolThreadExecutor());
234
235 availableDevices = storageService.<DeviceId>setBuilder()
236 .withName("onos-online-devices")
237 .withSerializer(Serializer.using(KryoNamespaces.API))
238 .withPartitionsDisabled()
239 .withRelaxedReadConsistency()
240 .build();
241
242 deviceDescriptions.addListener(deviceUpdateListener);
243 portDescriptions.addListener(portUpdateListener);
244 devicePortStats.addListener(portStatsListener);
245 availableDevices.addListener(deviceStatusTracker);
246 log.info("Started");
247 }
248
249 @Deactivate
250 public void deactivate() {
251 devicePortStats.removeListener(portStatsListener);
252 deviceDescriptions.removeListener(deviceUpdateListener);
253 portDescriptions.removeListener(portUpdateListener);
254 availableDevices.removeListener(deviceStatusTracker);
255 devicePortStats.destroy();
256 devicePortDeltaStats.destroy();
257 deviceDescriptions.destroy();
258 portDescriptions.destroy();
259 devices.clear();
260 devicePorts.clear();
261 clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
262 clusterCommunicator.removeSubscriber(PORT_INJECTED);
263 log.info("Stopped");
264 }
265
266 @Override
267 public Iterable<Device> getDevices() {
268 return devices.values();
269 }
270
271 @Override
272 public int getDeviceCount() {
273 return devices.size();
274 }
275
276 @Override
277 public Device getDevice(DeviceId deviceId) {
278 return devices.get(deviceId);
279 }
280
281 @Override
282 public DeviceEvent createOrUpdateDevice(ProviderId providerId,
283 DeviceId deviceId,
284 DeviceDescription deviceDescription) {
285 NodeId master = mastershipService.getMasterFor(deviceId);
286 if (localNodeId.equals(master)) {
287 deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
288 return refreshDeviceCache(providerId, deviceId);
289 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800290 // Only forward for ConfigProvider
291 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800292 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800293 return null;
294 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700295 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
296 return Futures.getUnchecked(
297 clusterCommunicator.sendAndReceive(deviceInjectedEvent,
298 DEVICE_INJECTED,
299 SERIALIZER::encode,
300 SERIALIZER::decode,
301 master));
302 }
303 }
304
305 private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
306 AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
307 Device device = devices.compute(deviceId, (k, existingDevice) -> {
308 Device newDevice = composeDevice(deviceId);
309 if (existingDevice == null) {
310 eventType.set(DEVICE_ADDED);
311 } else {
312 // We allow only certain attributes to trigger update
313 boolean propertiesChanged =
314 !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
315 !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
316 !Objects.equals(existingDevice.providerId(), newDevice.providerId());
317 boolean annotationsChanged =
318 !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
319
320 // Primary providers can respond to all changes, but ancillary ones
321 // should respond only to annotation changes.
322 if ((providerId.isAncillary() && annotationsChanged) ||
323 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
324 boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
325 verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
326 providerId, existingDevice, devices.get(deviceId), newDevice);
327 eventType.set(DEVICE_UPDATED);
328 }
329 }
330 return newDevice;
331 });
332 if (eventType.get() != null && !providerId.isAncillary()) {
333 markOnline(deviceId);
334 }
335 return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
336 }
337
338 /**
339 * Returns the primary providerId for a device.
340 * @param deviceId device identifier
341 * @return primary providerId
342 */
343 private Set<ProviderId> getAllProviders(DeviceId deviceId) {
344 return deviceDescriptions.keySet()
345 .stream()
346 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
347 .map(deviceKey -> deviceKey.providerId())
348 .collect(Collectors.toSet());
349 }
350
351 /**
352 * Returns the identifier for all providers for a device.
353 * @param deviceId device identifier
354 * @return set of provider identifiers
355 */
356 private ProviderId getPrimaryProviderId(DeviceId deviceId) {
357 Set<ProviderId> allProviderIds = getAllProviders(deviceId);
358 return allProviderIds.stream()
359 .filter(p -> !p.isAncillary())
360 .findFirst()
361 .orElse(Iterables.getFirst(allProviderIds, null));
362 }
363
364 /**
365 * Returns a Device, merging descriptions from multiple Providers.
366 *
367 * @param deviceId device identifier
368 * @return Device instance
369 */
370 private Device composeDevice(DeviceId deviceId) {
371
372 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
373 DeviceDescription primaryDeviceDescription =
374 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
375
376 Type type = primaryDeviceDescription.type();
377 String manufacturer = primaryDeviceDescription.manufacturer();
378 String hwVersion = primaryDeviceDescription.hwVersion();
379 String swVersion = primaryDeviceDescription.swVersion();
380 String serialNumber = primaryDeviceDescription.serialNumber();
381 ChassisId chassisId = primaryDeviceDescription.chassisId();
382 DefaultAnnotations annotations = mergeAnnotations(deviceId);
383
384 return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
385 hwVersion, swVersion, serialNumber,
386 chassisId, annotations);
387 }
388
389 private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
390 Device removedDevice = devices.remove(deviceId);
391 if (removedDevice != null) {
392 getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
393 return new DeviceEvent(DEVICE_REMOVED, removedDevice);
394 }
395 return null;
396 }
397
398 private boolean markOnline(DeviceId deviceId) {
399 return availableDevices.add(deviceId);
400 }
401
402 @Override
403 public DeviceEvent markOffline(DeviceId deviceId) {
404 availableDevices.remove(deviceId);
405 // set update listener will raise the event.
406 return null;
407 }
408
409 @Override
410 public List<DeviceEvent> updatePorts(ProviderId providerId,
411 DeviceId deviceId,
412 List<PortDescription> descriptions) {
413 NodeId master = mastershipService.getMasterFor(deviceId);
414 List<DeviceEvent> deviceEvents = null;
415 if (localNodeId.equals(master)) {
416 descriptions.forEach(description -> {
417 PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
418 portDescriptions.put(portKey, description);
419 });
420 deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
421 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800422 // Only forward for ConfigProvider
423 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800424 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800425 return null;
426 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700427 if (master == null) {
428 return Collections.emptyList();
429 }
430 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
431 deviceEvents = Futures.getUnchecked(
432 clusterCommunicator.sendAndReceive(portInjectedEvent,
433 PORT_INJECTED,
434 SERIALIZER::encode,
435 SERIALIZER::decode,
436 master));
437 }
438 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
439 }
440
441 private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
442 DeviceId deviceId,
443 Optional<PortNumber> portNumber) {
444 Device device = devices.get(deviceId);
445 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
446 List<DeviceEvent> events = Lists.newArrayList();
447
448 Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
449 List<PortDescription> descriptions = Lists.newArrayList();
450 portDescriptions.entrySet().forEach(e -> {
451 PortKey key = e.getKey();
452 PortDescription value = e.getValue();
453 if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
454 if (portNumber.isPresent()) {
455 if (portNumber.get().equals(key.portNumber())) {
456 descriptions.add(value);
457 }
458 } else {
459 descriptions.add(value);
460 }
461 }
462 });
463
464 for (PortDescription description : descriptions) {
465 final PortNumber number = description.portNumber();
466 ports.compute(number, (k, existingPort) -> {
467 Port newPort = composePort(device, number);
468 if (existingPort == null) {
469 events.add(new DeviceEvent(PORT_ADDED, device, newPort));
470 } else {
471 if (existingPort.isEnabled() != newPort.isEnabled() ||
472 existingPort.type() != newPort.type() ||
473 existingPort.portSpeed() != newPort.portSpeed() ||
474 !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
475 events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
476 }
477 }
478 return newPort;
479 });
480 }
481
482 return events;
483 }
484
485 /**
486 * Returns a Port, merging descriptions from multiple Providers.
487 *
488 * @param device device the port is on
489 * @param number port number
490 * @return Port instance
491 */
492 private Port composePort(Device device, PortNumber number) {
493
494 Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
495 portDescriptions.entrySet().forEach(entry -> {
496 PortKey portKey = entry.getKey();
497 if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
498 descriptions.put(portKey.providerId(), entry.getValue());
499 }
500 });
501 ProviderId primary = getPrimaryProviderId(device.id());
502 PortDescription primaryDescription = descriptions.get(primary);
503
504 // if no primary, assume not enabled
505 boolean isEnabled = false;
506 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
507 if (primaryDescription != null) {
508 isEnabled = primaryDescription.isEnabled();
509 annotations = merge(annotations, primaryDescription.annotations());
510 }
511 Port updated = null;
512 for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
513 if (e.getKey().equals(primary)) {
514 continue;
515 }
516 annotations = merge(annotations, e.getValue().annotations());
517 updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
518 }
519 if (primaryDescription == null) {
520 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
521 }
522 return updated == null
523 ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
524 : updated;
525 }
526
527 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
528 PortDescription description, Annotations annotations) {
529 switch (description.type()) {
530 case OMS:
531 OmsPortDescription omsDesc = (OmsPortDescription) description;
532 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
533 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
534 case OCH:
535 OchPortDescription ochDesc = (OchPortDescription) description;
536 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
537 ochDesc.isTunable(), ochDesc.lambda(), annotations);
538 case ODUCLT:
539 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
540 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
541 default:
542 return new DefaultPort(device, number, isEnabled, description.type(),
543 description.portSpeed(), annotations);
544 }
545 }
546
547 @Override
548 public DeviceEvent updatePortStatus(ProviderId providerId,
549 DeviceId deviceId,
550 PortDescription portDescription) {
551 portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
552 List<DeviceEvent> events =
553 refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
554 return Iterables.getFirst(events, null);
555 }
556
557 @Override
558 public List<Port> getPorts(DeviceId deviceId) {
559 return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
560 }
561
562 @Override
563 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
564 return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
565 }
566
567 @Override
568 public DeviceEvent updatePortStatistics(ProviderId providerId,
569 DeviceId deviceId,
570 Collection<PortStatistics> newStatsCollection) {
571
572 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
573 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
574 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
575
576 if (prvStatsMap != null) {
577 for (PortStatistics newStats : newStatsCollection) {
578 PortNumber port = PortNumber.portNumber(newStats.port());
579 PortStatistics prvStats = prvStatsMap.get(port);
580 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
581 PortStatistics deltaStats = builder.build();
582 if (prvStats != null) {
583 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
584 }
585 deltaStatsMap.put(port, deltaStats);
586 newStatsMap.put(port, newStats);
587 }
588 } else {
589 for (PortStatistics newStats : newStatsCollection) {
590 PortNumber port = PortNumber.portNumber(newStats.port());
591 newStatsMap.put(port, newStats);
592 }
593 }
594 devicePortDeltaStats.put(deviceId, deltaStatsMap);
595 devicePortStats.put(deviceId, newStatsMap);
596 // DeviceEvent returns null because of InternalPortStatsListener usage
597 return null;
598 }
599
600 /**
601 * Calculate delta statistics by subtracting previous from new statistics.
602 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700603 * @param deviceId device indentifier
604 * @param prvStats previous port statistics
605 * @param newStats new port statistics
Madan Jampanifc8aced2015-08-27 11:06:12 -0700606 * @return PortStatistics
607 */
608 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
609 // calculate time difference
610 long deltaStatsSec, deltaStatsNano;
611 if (newStats.durationNano() < prvStats.durationNano()) {
612 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
613 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
614 } else {
615 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
616 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
617 }
618 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
619 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
620 .setPort(newStats.port())
621 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
622 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
623 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
624 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
625 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
626 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
627 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
628 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
629 .setDurationSec(deltaStatsSec)
630 .setDurationNano(deltaStatsNano)
631 .build();
632 return deltaStats;
633 }
634
635 @Override
636 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
637 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
638 if (portStats == null) {
639 return Collections.emptyList();
640 }
641 return ImmutableList.copyOf(portStats.values());
642 }
643
644 @Override
645 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
646 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
647 if (portStats == null) {
648 return Collections.emptyList();
649 }
650 return ImmutableList.copyOf(portStats.values());
651 }
652
653 @Override
654 public boolean isAvailable(DeviceId deviceId) {
655 return availableDevices.contains(deviceId);
656 }
657
658 @Override
659 public Iterable<Device> getAvailableDevices() {
660 return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
661 }
662
663 @Override
664 public DeviceEvent removeDevice(DeviceId deviceId) {
665 NodeId master = mastershipService.getMasterFor(deviceId);
666 // if there exist a master, forward
667 // if there is no master, try to become one and process
668 boolean relinquishAtEnd = false;
669 if (master == null) {
670 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
671 if (myRole != MastershipRole.NONE) {
672 relinquishAtEnd = true;
673 }
674 log.debug("Temporarily requesting role for {} to remove", deviceId);
675 MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
676 if (role == MastershipRole.MASTER) {
677 master = localNodeId;
678 }
679 }
680
681 if (!localNodeId.equals(master)) {
682 log.debug("{} has control of {}, forwarding remove request",
683 master, deviceId);
684
685 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
686 .whenComplete((r, e) -> {
687 if (e != null) {
688 log.error("Failed to forward {} remove request to its master", deviceId, e);
689 }
690 });
691 return null;
692 }
693
694 // I have control..
695 DeviceEvent event = null;
696 final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
697 DeviceDescription removedDeviceDescription =
698 deviceDescriptions.remove(deviceKey);
699 if (removedDeviceDescription != null) {
700 event = purgeDeviceCache(deviceId);
701 }
702
703 if (relinquishAtEnd) {
704 log.debug("Relinquishing temporary role acquired for {}", deviceId);
705 mastershipService.relinquishMastership(deviceId);
706 }
707 return event;
708 }
709
710 private DeviceEvent injectDevice(DeviceInjectedEvent event) {
711 return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
712 }
713
714 private List<DeviceEvent> injectPort(PortInjectedEvent event) {
715 return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
716 }
717
718 private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
719 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
720 DeviceDescription primaryDeviceDescription =
721 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
722 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
723 annotations = merge(annotations, primaryDeviceDescription.annotations());
724 for (ProviderId providerId : getAllProviders(deviceId)) {
725 if (!providerId.equals(primaryProviderId)) {
726 annotations = merge(annotations,
727 deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
728 }
729 }
730 return annotations;
731 }
732
733 private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
734 @Override
735 public void event(SetEvent<DeviceId> event) {
736 final DeviceId deviceId = event.entry();
737 final Device device = devices.get(deviceId);
738 if (device != null) {
739 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
740 } else {
741 pendingAvailableChangeUpdates.add(deviceId);
742 }
743 }
744 }
745
746 private class InternalDeviceChangeEventListener
747 implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
748 @Override
749 public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
750 DeviceId deviceId = event.key().deviceId();
751 ProviderId providerId = event.key().providerId();
752 if (event.type() == PUT) {
753 notifyDelegate(refreshDeviceCache(providerId, deviceId));
754 if (pendingAvailableChangeUpdates.remove(deviceId)) {
755 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
756 }
757 } else if (event.type() == REMOVE) {
758 notifyDelegate(purgeDeviceCache(deviceId));
759 }
760 }
761 }
762
763 private class InternalPortChangeEventListener
764 implements EventuallyConsistentMapListener<PortKey, PortDescription> {
765 @Override
766 public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
767 DeviceId deviceId = event.key().deviceId();
768 ProviderId providerId = event.key().providerId();
769 PortNumber portNumber = event.key().portNumber();
770 if (event.type() == PUT) {
771 if (devices.containsKey(deviceId)) {
772 List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
773 for (DeviceEvent deviceEvent : events) {
774 notifyDelegate(deviceEvent);
775 }
776 }
777 } else if (event.type() == REMOVE) {
778 log.warn("Unexpected port removed event");
779 }
780 }
781 }
782
783 private class InternalPortStatsListener
784 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
785 @Override
786 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
787 if (event.type() == PUT) {
788 Device device = devices.get(event.key());
789 if (device != null) {
790 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
791 }
792 }
793 }
794 }
795}