blob: fad232ef93d7560f313a23d4c4bfb0ef66378c2b [file] [log] [blame]
Madan Jampanifc8aced2015-08-27 11:06:12 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Madan Jampanifc8aced2015-08-27 11:06:12 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.device.impl;
17
Jian Li11599162016-01-15 15:46:16 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import com.google.common.util.concurrent.Futures;
Madan Jampanifc8aced2015-08-27 11:06:12 -070024import org.apache.felix.scr.annotations.Activate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070025import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.KryoNamespace;
31import org.onlab.util.SharedExecutors;
32import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.mastership.MastershipService;
35import org.onosproject.mastership.MastershipTermService;
36import org.onosproject.net.Annotations;
37import org.onosproject.net.AnnotationsUtil;
38import org.onosproject.net.DefaultAnnotations;
39import org.onosproject.net.DefaultDevice;
40import org.onosproject.net.DefaultPort;
41import org.onosproject.net.Device;
Jian Li11599162016-01-15 15:46:16 -080042import org.onosproject.net.Device.Type;
Madan Jampanifc8aced2015-08-27 11:06:12 -070043import org.onosproject.net.DeviceId;
44import org.onosproject.net.MastershipRole;
45import org.onosproject.net.OchPort;
46import org.onosproject.net.OduCltPort;
47import org.onosproject.net.OmsPort;
48import org.onosproject.net.Port;
49import org.onosproject.net.PortNumber;
Madan Jampanifc8aced2015-08-27 11:06:12 -070050import org.onosproject.net.device.DefaultPortStatistics;
51import org.onosproject.net.device.DeviceClockService;
52import org.onosproject.net.device.DeviceDescription;
53import org.onosproject.net.device.DeviceEvent;
54import org.onosproject.net.device.DeviceStore;
55import org.onosproject.net.device.DeviceStoreDelegate;
56import org.onosproject.net.device.OchPortDescription;
57import org.onosproject.net.device.OduCltPortDescription;
58import org.onosproject.net.device.OmsPortDescription;
59import org.onosproject.net.device.PortDescription;
60import org.onosproject.net.device.PortStatistics;
61import org.onosproject.net.provider.ProviderId;
62import org.onosproject.store.AbstractStore;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.impl.MastershipBasedTimestamp;
65import org.onosproject.store.serializers.KryoNamespaces;
66import org.onosproject.store.serializers.KryoSerializer;
67import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
68import org.onosproject.store.service.DistributedSet;
69import org.onosproject.store.service.EventuallyConsistentMap;
70import org.onosproject.store.service.EventuallyConsistentMapEvent;
Jian Li11599162016-01-15 15:46:16 -080071import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070072import org.onosproject.store.service.Serializer;
73import org.onosproject.store.service.SetEvent;
74import org.onosproject.store.service.SetEventListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070075import org.onosproject.store.service.StorageService;
Jian Li11599162016-01-15 15:46:16 -080076import org.onosproject.store.service.WallClockTimestamp;
Madan Jampanifc8aced2015-08-27 11:06:12 -070077import org.slf4j.Logger;
78
Jian Li11599162016-01-15 15:46:16 -080079import java.util.Collection;
80import java.util.Collections;
81import java.util.List;
82import java.util.Map;
83import java.util.Map.Entry;
84import java.util.Objects;
85import java.util.Optional;
86import java.util.Set;
87import java.util.concurrent.TimeUnit;
88import java.util.concurrent.atomic.AtomicReference;
89import java.util.stream.Collectors;
90
91import static com.google.common.base.Preconditions.checkArgument;
92import static com.google.common.base.Verify.verify;
93import static org.onosproject.net.DefaultAnnotations.merge;
94import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
95import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
96import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
97import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
98import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
99import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
100import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700101import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
102import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
103import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
Jian Li11599162016-01-15 15:46:16 -0800104import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
105import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
106import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700107
108/**
109 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
110 */
Jian Li11599162016-01-15 15:46:16 -0800111//@Component(immediate = true, enabled = false)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700112@Service
113public class ECDeviceStore
114 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
115 implements DeviceStore {
116
117 private final Logger log = getLogger(getClass());
118
119 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
120
121 private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
122 private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
123 Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
124
125 private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
126 private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
127 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
128 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
129
130 private DistributedSet<DeviceId> availableDevices;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected StorageService storageService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected MastershipService mastershipService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected MastershipTermService mastershipTermService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected DeviceClockService deviceClockService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected ClusterCommunicationService clusterCommunicator;
146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected ClusterService clusterService;
149
150 private NodeId localNodeId;
151 private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
152 new InternalDeviceChangeEventListener();
153 private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
154 new InternalPortChangeEventListener();
155 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
156 new InternalPortStatsListener();
157 private final SetEventListener<DeviceId> deviceStatusTracker =
158 new InternalDeviceStatusTracker();
159
160 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
161 @Override
162 protected void setupKryoPool() {
163 serializerPool = KryoNamespace.newBuilder()
164 .register(DistributedStoreSerializers.STORE_COMMON)
165 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
166 .register(DeviceInjectedEvent.class)
167 .register(PortInjectedEvent.class)
168 .build();
169 }
170 };
171
172 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
173 .register(KryoNamespaces.API)
174 .register(DeviceKey.class)
175 .register(PortKey.class)
176 .register(DeviceKey.class)
177 .register(PortKey.class)
178 .register(MastershipBasedTimestamp.class);
179
180 @Activate
181 public void activate() {
182 localNodeId = clusterService.getLocalNode().id();
183
184 deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
185 .withName("onos-device-descriptions")
186 .withSerializer(SERIALIZER_BUILDER)
187 .withTimestampProvider((k, v) -> {
188 try {
189 return deviceClockService.getTimestamp(k.deviceId());
190 } catch (IllegalStateException e) {
191 return null;
192 }
193 }).build();
194
195 portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
196 .withName("onos-port-descriptions")
197 .withSerializer(SERIALIZER_BUILDER)
198 .withTimestampProvider((k, v) -> {
199 try {
200 return deviceClockService.getTimestamp(k.deviceId());
201 } catch (IllegalStateException e) {
202 return null;
203 }
204 }).build();
205
206 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
207 .withName("onos-port-stats")
208 .withSerializer(SERIALIZER_BUILDER)
209 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
210 .withTimestampProvider((k, v) -> new WallClockTimestamp())
211 .withTombstonesDisabled()
212 .build();
213
214 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
215 eventuallyConsistentMapBuilder()
216 .withName("onos-port-stats-delta")
217 .withSerializer(SERIALIZER_BUILDER)
218 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
219 .withTimestampProvider((k, v) -> new WallClockTimestamp())
220 .withTombstonesDisabled()
221 .build();
222
223 clusterCommunicator.addSubscriber(DEVICE_INJECTED,
224 SERIALIZER::decode,
225 this::injectDevice,
226 SERIALIZER::encode,
227 SharedExecutors.getPoolThreadExecutor());
228
229 clusterCommunicator.addSubscriber(PORT_INJECTED,
230 SERIALIZER::decode,
231 this::injectPort,
232 SERIALIZER::encode,
233 SharedExecutors.getPoolThreadExecutor());
234
235 availableDevices = storageService.<DeviceId>setBuilder()
236 .withName("onos-online-devices")
237 .withSerializer(Serializer.using(KryoNamespaces.API))
238 .withPartitionsDisabled()
239 .withRelaxedReadConsistency()
Madan Jampani538be742016-02-10 14:55:38 -0800240 .build()
241 .asDistributedSet();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700242
243 deviceDescriptions.addListener(deviceUpdateListener);
244 portDescriptions.addListener(portUpdateListener);
245 devicePortStats.addListener(portStatsListener);
246 availableDevices.addListener(deviceStatusTracker);
247 log.info("Started");
248 }
249
250 @Deactivate
251 public void deactivate() {
252 devicePortStats.removeListener(portStatsListener);
253 deviceDescriptions.removeListener(deviceUpdateListener);
254 portDescriptions.removeListener(portUpdateListener);
255 availableDevices.removeListener(deviceStatusTracker);
256 devicePortStats.destroy();
257 devicePortDeltaStats.destroy();
258 deviceDescriptions.destroy();
259 portDescriptions.destroy();
260 devices.clear();
261 devicePorts.clear();
262 clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
263 clusterCommunicator.removeSubscriber(PORT_INJECTED);
264 log.info("Stopped");
265 }
266
267 @Override
268 public Iterable<Device> getDevices() {
269 return devices.values();
270 }
271
272 @Override
273 public int getDeviceCount() {
274 return devices.size();
275 }
276
277 @Override
278 public Device getDevice(DeviceId deviceId) {
279 return devices.get(deviceId);
280 }
281
282 @Override
283 public DeviceEvent createOrUpdateDevice(ProviderId providerId,
284 DeviceId deviceId,
285 DeviceDescription deviceDescription) {
286 NodeId master = mastershipService.getMasterFor(deviceId);
287 if (localNodeId.equals(master)) {
288 deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
289 return refreshDeviceCache(providerId, deviceId);
290 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800291 // Only forward for ConfigProvider
292 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800293 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800294 return null;
295 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700296 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
297 return Futures.getUnchecked(
298 clusterCommunicator.sendAndReceive(deviceInjectedEvent,
299 DEVICE_INJECTED,
300 SERIALIZER::encode,
301 SERIALIZER::decode,
302 master));
303 }
304 }
305
306 private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
307 AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
308 Device device = devices.compute(deviceId, (k, existingDevice) -> {
309 Device newDevice = composeDevice(deviceId);
310 if (existingDevice == null) {
311 eventType.set(DEVICE_ADDED);
312 } else {
313 // We allow only certain attributes to trigger update
314 boolean propertiesChanged =
315 !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
316 !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
317 !Objects.equals(existingDevice.providerId(), newDevice.providerId());
318 boolean annotationsChanged =
319 !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
320
321 // Primary providers can respond to all changes, but ancillary ones
322 // should respond only to annotation changes.
323 if ((providerId.isAncillary() && annotationsChanged) ||
324 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
325 boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
326 verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
327 providerId, existingDevice, devices.get(deviceId), newDevice);
328 eventType.set(DEVICE_UPDATED);
329 }
330 }
331 return newDevice;
332 });
333 if (eventType.get() != null && !providerId.isAncillary()) {
334 markOnline(deviceId);
335 }
336 return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
337 }
338
339 /**
340 * Returns the primary providerId for a device.
341 * @param deviceId device identifier
342 * @return primary providerId
343 */
344 private Set<ProviderId> getAllProviders(DeviceId deviceId) {
345 return deviceDescriptions.keySet()
346 .stream()
347 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
348 .map(deviceKey -> deviceKey.providerId())
349 .collect(Collectors.toSet());
350 }
351
352 /**
353 * Returns the identifier for all providers for a device.
354 * @param deviceId device identifier
355 * @return set of provider identifiers
356 */
357 private ProviderId getPrimaryProviderId(DeviceId deviceId) {
358 Set<ProviderId> allProviderIds = getAllProviders(deviceId);
359 return allProviderIds.stream()
360 .filter(p -> !p.isAncillary())
361 .findFirst()
362 .orElse(Iterables.getFirst(allProviderIds, null));
363 }
364
365 /**
366 * Returns a Device, merging descriptions from multiple Providers.
367 *
368 * @param deviceId device identifier
369 * @return Device instance
370 */
371 private Device composeDevice(DeviceId deviceId) {
372
373 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
374 DeviceDescription primaryDeviceDescription =
375 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
376
377 Type type = primaryDeviceDescription.type();
378 String manufacturer = primaryDeviceDescription.manufacturer();
379 String hwVersion = primaryDeviceDescription.hwVersion();
380 String swVersion = primaryDeviceDescription.swVersion();
381 String serialNumber = primaryDeviceDescription.serialNumber();
382 ChassisId chassisId = primaryDeviceDescription.chassisId();
383 DefaultAnnotations annotations = mergeAnnotations(deviceId);
384
385 return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
386 hwVersion, swVersion, serialNumber,
387 chassisId, annotations);
388 }
389
390 private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
391 Device removedDevice = devices.remove(deviceId);
392 if (removedDevice != null) {
393 getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
394 return new DeviceEvent(DEVICE_REMOVED, removedDevice);
395 }
396 return null;
397 }
398
399 private boolean markOnline(DeviceId deviceId) {
400 return availableDevices.add(deviceId);
401 }
402
403 @Override
404 public DeviceEvent markOffline(DeviceId deviceId) {
405 availableDevices.remove(deviceId);
406 // set update listener will raise the event.
407 return null;
408 }
409
410 @Override
411 public List<DeviceEvent> updatePorts(ProviderId providerId,
412 DeviceId deviceId,
413 List<PortDescription> descriptions) {
414 NodeId master = mastershipService.getMasterFor(deviceId);
415 List<DeviceEvent> deviceEvents = null;
416 if (localNodeId.equals(master)) {
417 descriptions.forEach(description -> {
418 PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
419 portDescriptions.put(portKey, description);
420 });
421 deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
422 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800423 // Only forward for ConfigProvider
424 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800425 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta89461772016-01-26 12:18:10 -0800426 return Collections.emptyList();
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800427 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700428 if (master == null) {
429 return Collections.emptyList();
430 }
431 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
432 deviceEvents = Futures.getUnchecked(
433 clusterCommunicator.sendAndReceive(portInjectedEvent,
434 PORT_INJECTED,
435 SERIALIZER::encode,
436 SERIALIZER::decode,
437 master));
438 }
439 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
440 }
441
442 private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
443 DeviceId deviceId,
444 Optional<PortNumber> portNumber) {
445 Device device = devices.get(deviceId);
446 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
447 List<DeviceEvent> events = Lists.newArrayList();
448
449 Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
450 List<PortDescription> descriptions = Lists.newArrayList();
451 portDescriptions.entrySet().forEach(e -> {
452 PortKey key = e.getKey();
453 PortDescription value = e.getValue();
454 if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
455 if (portNumber.isPresent()) {
456 if (portNumber.get().equals(key.portNumber())) {
457 descriptions.add(value);
458 }
459 } else {
460 descriptions.add(value);
461 }
462 }
463 });
464
465 for (PortDescription description : descriptions) {
466 final PortNumber number = description.portNumber();
467 ports.compute(number, (k, existingPort) -> {
468 Port newPort = composePort(device, number);
469 if (existingPort == null) {
470 events.add(new DeviceEvent(PORT_ADDED, device, newPort));
471 } else {
472 if (existingPort.isEnabled() != newPort.isEnabled() ||
473 existingPort.type() != newPort.type() ||
474 existingPort.portSpeed() != newPort.portSpeed() ||
475 !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
476 events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
477 }
478 }
479 return newPort;
480 });
481 }
482
483 return events;
484 }
485
486 /**
487 * Returns a Port, merging descriptions from multiple Providers.
488 *
489 * @param device device the port is on
490 * @param number port number
491 * @return Port instance
492 */
493 private Port composePort(Device device, PortNumber number) {
494
495 Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
496 portDescriptions.entrySet().forEach(entry -> {
497 PortKey portKey = entry.getKey();
498 if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
499 descriptions.put(portKey.providerId(), entry.getValue());
500 }
501 });
502 ProviderId primary = getPrimaryProviderId(device.id());
503 PortDescription primaryDescription = descriptions.get(primary);
504
505 // if no primary, assume not enabled
506 boolean isEnabled = false;
507 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
508 if (primaryDescription != null) {
509 isEnabled = primaryDescription.isEnabled();
510 annotations = merge(annotations, primaryDescription.annotations());
511 }
512 Port updated = null;
513 for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
514 if (e.getKey().equals(primary)) {
515 continue;
516 }
517 annotations = merge(annotations, e.getValue().annotations());
518 updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
519 }
520 if (primaryDescription == null) {
521 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
522 }
523 return updated == null
524 ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
525 : updated;
526 }
527
528 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
529 PortDescription description, Annotations annotations) {
HIGUCHI Yuta34a3f692016-01-09 21:08:57 -0800530 // FIXME this switch need to go away once all ports are done.
Madan Jampanifc8aced2015-08-27 11:06:12 -0700531 switch (description.type()) {
532 case OMS:
HIGUCHI Yuta95d83e82016-04-26 12:13:48 -0700533 if (description instanceof OmsPortDescription) {
534 // remove if-block once deprecation is complete
535 OmsPortDescription omsDesc = (OmsPortDescription) description;
536 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
537 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
538 }
539 // same as default
540 return new DefaultPort(device, number, isEnabled, description.type(),
541 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700542 case OCH:
HIGUCHI Yuta34a3f692016-01-09 21:08:57 -0800543 if (description instanceof OchPortDescription) {
544 // remove if-block once Och deprecation is complete
545 OchPortDescription ochDesc = (OchPortDescription) description;
546 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
547 ochDesc.isTunable(), ochDesc.lambda(), annotations);
548 }
549 return new DefaultPort(device, number, isEnabled, description.type(),
550 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700551 case ODUCLT:
552 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
553 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
554 default:
555 return new DefaultPort(device, number, isEnabled, description.type(),
556 description.portSpeed(), annotations);
557 }
558 }
559
560 @Override
561 public DeviceEvent updatePortStatus(ProviderId providerId,
562 DeviceId deviceId,
563 PortDescription portDescription) {
564 portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
565 List<DeviceEvent> events =
566 refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
567 return Iterables.getFirst(events, null);
568 }
569
570 @Override
571 public List<Port> getPorts(DeviceId deviceId) {
572 return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
573 }
574
575 @Override
576 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
577 return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
578 }
579
580 @Override
581 public DeviceEvent updatePortStatistics(ProviderId providerId,
582 DeviceId deviceId,
583 Collection<PortStatistics> newStatsCollection) {
584
585 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
586 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
587 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
588
589 if (prvStatsMap != null) {
590 for (PortStatistics newStats : newStatsCollection) {
591 PortNumber port = PortNumber.portNumber(newStats.port());
592 PortStatistics prvStats = prvStatsMap.get(port);
593 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
594 PortStatistics deltaStats = builder.build();
595 if (prvStats != null) {
596 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
597 }
598 deltaStatsMap.put(port, deltaStats);
599 newStatsMap.put(port, newStats);
600 }
601 } else {
602 for (PortStatistics newStats : newStatsCollection) {
603 PortNumber port = PortNumber.portNumber(newStats.port());
604 newStatsMap.put(port, newStats);
605 }
606 }
607 devicePortDeltaStats.put(deviceId, deltaStatsMap);
608 devicePortStats.put(deviceId, newStatsMap);
609 // DeviceEvent returns null because of InternalPortStatsListener usage
610 return null;
611 }
612
613 /**
614 * Calculate delta statistics by subtracting previous from new statistics.
615 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700616 * @param deviceId device indentifier
617 * @param prvStats previous port statistics
618 * @param newStats new port statistics
Madan Jampanifc8aced2015-08-27 11:06:12 -0700619 * @return PortStatistics
620 */
621 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
622 // calculate time difference
623 long deltaStatsSec, deltaStatsNano;
624 if (newStats.durationNano() < prvStats.durationNano()) {
625 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
626 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
627 } else {
628 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
629 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
630 }
631 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
632 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
633 .setPort(newStats.port())
634 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
635 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
636 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
637 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
638 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
639 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
640 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
641 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
642 .setDurationSec(deltaStatsSec)
643 .setDurationNano(deltaStatsNano)
644 .build();
645 return deltaStats;
646 }
647
648 @Override
649 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
650 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
651 if (portStats == null) {
652 return Collections.emptyList();
653 }
654 return ImmutableList.copyOf(portStats.values());
655 }
656
657 @Override
658 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
659 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
660 if (portStats == null) {
661 return Collections.emptyList();
662 }
663 return ImmutableList.copyOf(portStats.values());
664 }
665
666 @Override
667 public boolean isAvailable(DeviceId deviceId) {
668 return availableDevices.contains(deviceId);
669 }
670
671 @Override
672 public Iterable<Device> getAvailableDevices() {
673 return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
674 }
675
676 @Override
677 public DeviceEvent removeDevice(DeviceId deviceId) {
678 NodeId master = mastershipService.getMasterFor(deviceId);
679 // if there exist a master, forward
680 // if there is no master, try to become one and process
681 boolean relinquishAtEnd = false;
682 if (master == null) {
683 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
684 if (myRole != MastershipRole.NONE) {
685 relinquishAtEnd = true;
686 }
687 log.debug("Temporarily requesting role for {} to remove", deviceId);
688 MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
689 if (role == MastershipRole.MASTER) {
690 master = localNodeId;
691 }
692 }
693
694 if (!localNodeId.equals(master)) {
695 log.debug("{} has control of {}, forwarding remove request",
696 master, deviceId);
697
698 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
699 .whenComplete((r, e) -> {
700 if (e != null) {
701 log.error("Failed to forward {} remove request to its master", deviceId, e);
702 }
703 });
704 return null;
705 }
706
707 // I have control..
708 DeviceEvent event = null;
709 final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
710 DeviceDescription removedDeviceDescription =
711 deviceDescriptions.remove(deviceKey);
712 if (removedDeviceDescription != null) {
713 event = purgeDeviceCache(deviceId);
714 }
715
716 if (relinquishAtEnd) {
717 log.debug("Relinquishing temporary role acquired for {}", deviceId);
718 mastershipService.relinquishMastership(deviceId);
719 }
720 return event;
721 }
722
723 private DeviceEvent injectDevice(DeviceInjectedEvent event) {
724 return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
725 }
726
727 private List<DeviceEvent> injectPort(PortInjectedEvent event) {
728 return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
729 }
730
731 private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
732 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
733 DeviceDescription primaryDeviceDescription =
734 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
735 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
736 annotations = merge(annotations, primaryDeviceDescription.annotations());
737 for (ProviderId providerId : getAllProviders(deviceId)) {
738 if (!providerId.equals(primaryProviderId)) {
739 annotations = merge(annotations,
740 deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
741 }
742 }
743 return annotations;
744 }
745
746 private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
747 @Override
748 public void event(SetEvent<DeviceId> event) {
749 final DeviceId deviceId = event.entry();
750 final Device device = devices.get(deviceId);
751 if (device != null) {
752 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
753 } else {
754 pendingAvailableChangeUpdates.add(deviceId);
755 }
756 }
757 }
758
759 private class InternalDeviceChangeEventListener
760 implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
761 @Override
762 public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
763 DeviceId deviceId = event.key().deviceId();
764 ProviderId providerId = event.key().providerId();
765 if (event.type() == PUT) {
766 notifyDelegate(refreshDeviceCache(providerId, deviceId));
767 if (pendingAvailableChangeUpdates.remove(deviceId)) {
768 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
769 }
770 } else if (event.type() == REMOVE) {
771 notifyDelegate(purgeDeviceCache(deviceId));
772 }
773 }
774 }
775
776 private class InternalPortChangeEventListener
777 implements EventuallyConsistentMapListener<PortKey, PortDescription> {
778 @Override
779 public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
780 DeviceId deviceId = event.key().deviceId();
781 ProviderId providerId = event.key().providerId();
782 PortNumber portNumber = event.key().portNumber();
783 if (event.type() == PUT) {
784 if (devices.containsKey(deviceId)) {
785 List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
786 for (DeviceEvent deviceEvent : events) {
787 notifyDelegate(deviceEvent);
788 }
789 }
790 } else if (event.type() == REMOVE) {
791 log.warn("Unexpected port removed event");
792 }
793 }
794 }
795
796 private class InternalPortStatsListener
797 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
798 @Override
799 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
800 if (event.type() == PUT) {
801 Device device = devices.get(event.key());
802 if (device != null) {
803 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
804 }
805 }
806 }
807 }
808}