blob: a407d87a15910b92e359e4bb9748113910883562 [file] [log] [blame]
Madan Jampanifc8aced2015-08-27 11:06:12 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Madan Jampanifc8aced2015-08-27 11:06:12 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.device.impl;
17
Jian Li11599162016-01-15 15:46:16 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import com.google.common.util.concurrent.Futures;
Madan Jampanifc8aced2015-08-27 11:06:12 -070024import org.apache.felix.scr.annotations.Activate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070025import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.KryoNamespace;
31import org.onlab.util.SharedExecutors;
32import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.mastership.MastershipService;
35import org.onosproject.mastership.MastershipTermService;
36import org.onosproject.net.Annotations;
37import org.onosproject.net.AnnotationsUtil;
38import org.onosproject.net.DefaultAnnotations;
39import org.onosproject.net.DefaultDevice;
40import org.onosproject.net.DefaultPort;
41import org.onosproject.net.Device;
Jian Li11599162016-01-15 15:46:16 -080042import org.onosproject.net.Device.Type;
Madan Jampanifc8aced2015-08-27 11:06:12 -070043import org.onosproject.net.DeviceId;
44import org.onosproject.net.MastershipRole;
45import org.onosproject.net.OchPort;
46import org.onosproject.net.OduCltPort;
47import org.onosproject.net.OmsPort;
48import org.onosproject.net.Port;
49import org.onosproject.net.PortNumber;
Madan Jampanifc8aced2015-08-27 11:06:12 -070050import org.onosproject.net.device.DefaultPortStatistics;
51import org.onosproject.net.device.DeviceClockService;
52import org.onosproject.net.device.DeviceDescription;
53import org.onosproject.net.device.DeviceEvent;
54import org.onosproject.net.device.DeviceStore;
55import org.onosproject.net.device.DeviceStoreDelegate;
56import org.onosproject.net.device.OchPortDescription;
57import org.onosproject.net.device.OduCltPortDescription;
58import org.onosproject.net.device.OmsPortDescription;
59import org.onosproject.net.device.PortDescription;
60import org.onosproject.net.device.PortStatistics;
61import org.onosproject.net.provider.ProviderId;
62import org.onosproject.store.AbstractStore;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.impl.MastershipBasedTimestamp;
65import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070066import org.onosproject.store.serializers.StoreSerializer;
Madan Jampanifc8aced2015-08-27 11:06:12 -070067import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
68import org.onosproject.store.service.DistributedSet;
69import org.onosproject.store.service.EventuallyConsistentMap;
70import org.onosproject.store.service.EventuallyConsistentMapEvent;
Jian Li11599162016-01-15 15:46:16 -080071import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070072import org.onosproject.store.service.Serializer;
73import org.onosproject.store.service.SetEvent;
74import org.onosproject.store.service.SetEventListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070075import org.onosproject.store.service.StorageService;
Jian Li11599162016-01-15 15:46:16 -080076import org.onosproject.store.service.WallClockTimestamp;
Madan Jampanifc8aced2015-08-27 11:06:12 -070077import org.slf4j.Logger;
78
Jian Li11599162016-01-15 15:46:16 -080079import java.util.Collection;
80import java.util.Collections;
81import java.util.List;
82import java.util.Map;
83import java.util.Map.Entry;
84import java.util.Objects;
85import java.util.Optional;
86import java.util.Set;
87import java.util.concurrent.TimeUnit;
88import java.util.concurrent.atomic.AtomicReference;
89import java.util.stream.Collectors;
90
91import static com.google.common.base.Preconditions.checkArgument;
92import static com.google.common.base.Verify.verify;
93import static org.onosproject.net.DefaultAnnotations.merge;
94import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
95import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
96import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
97import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
98import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
99import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
100import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700101import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
102import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
103import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
Jian Li11599162016-01-15 15:46:16 -0800104import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
105import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
106import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700107
108/**
109 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
110 */
Jian Li11599162016-01-15 15:46:16 -0800111//@Component(immediate = true, enabled = false)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700112@Service
113public class ECDeviceStore
114 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
115 implements DeviceStore {
116
117 private final Logger log = getLogger(getClass());
118
119 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
120
121 private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
122 private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
123 Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
124
125 private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
126 private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
127 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
128 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
129
130 private DistributedSet<DeviceId> availableDevices;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected StorageService storageService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected MastershipService mastershipService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected MastershipTermService mastershipTermService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected DeviceClockService deviceClockService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected ClusterCommunicationService clusterCommunicator;
146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
148 protected ClusterService clusterService;
149
150 private NodeId localNodeId;
151 private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
152 new InternalDeviceChangeEventListener();
153 private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
154 new InternalPortChangeEventListener();
155 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
156 new InternalPortStatsListener();
157 private final SetEventListener<DeviceId> deviceStatusTracker =
158 new InternalDeviceStatusTracker();
159
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700160 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
161 KryoNamespace.newBuilder()
Madan Jampanifc8aced2015-08-27 11:06:12 -0700162 .register(DistributedStoreSerializers.STORE_COMMON)
163 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
164 .register(DeviceInjectedEvent.class)
165 .register(PortInjectedEvent.class)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700166 .build("ECDevice"));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700167
168 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
169 .register(KryoNamespaces.API)
170 .register(DeviceKey.class)
171 .register(PortKey.class)
172 .register(DeviceKey.class)
173 .register(PortKey.class)
174 .register(MastershipBasedTimestamp.class);
175
176 @Activate
177 public void activate() {
178 localNodeId = clusterService.getLocalNode().id();
179
180 deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
181 .withName("onos-device-descriptions")
182 .withSerializer(SERIALIZER_BUILDER)
183 .withTimestampProvider((k, v) -> {
184 try {
185 return deviceClockService.getTimestamp(k.deviceId());
186 } catch (IllegalStateException e) {
187 return null;
188 }
189 }).build();
190
191 portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
192 .withName("onos-port-descriptions")
193 .withSerializer(SERIALIZER_BUILDER)
194 .withTimestampProvider((k, v) -> {
195 try {
196 return deviceClockService.getTimestamp(k.deviceId());
197 } catch (IllegalStateException e) {
198 return null;
199 }
200 }).build();
201
202 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
203 .withName("onos-port-stats")
204 .withSerializer(SERIALIZER_BUILDER)
205 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
206 .withTimestampProvider((k, v) -> new WallClockTimestamp())
207 .withTombstonesDisabled()
208 .build();
209
210 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
211 eventuallyConsistentMapBuilder()
212 .withName("onos-port-stats-delta")
213 .withSerializer(SERIALIZER_BUILDER)
214 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
215 .withTimestampProvider((k, v) -> new WallClockTimestamp())
216 .withTombstonesDisabled()
217 .build();
218
219 clusterCommunicator.addSubscriber(DEVICE_INJECTED,
220 SERIALIZER::decode,
221 this::injectDevice,
222 SERIALIZER::encode,
223 SharedExecutors.getPoolThreadExecutor());
224
225 clusterCommunicator.addSubscriber(PORT_INJECTED,
226 SERIALIZER::decode,
227 this::injectPort,
228 SERIALIZER::encode,
229 SharedExecutors.getPoolThreadExecutor());
230
231 availableDevices = storageService.<DeviceId>setBuilder()
232 .withName("onos-online-devices")
233 .withSerializer(Serializer.using(KryoNamespaces.API))
234 .withPartitionsDisabled()
235 .withRelaxedReadConsistency()
Madan Jampani538be742016-02-10 14:55:38 -0800236 .build()
237 .asDistributedSet();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700238
239 deviceDescriptions.addListener(deviceUpdateListener);
240 portDescriptions.addListener(portUpdateListener);
241 devicePortStats.addListener(portStatsListener);
242 availableDevices.addListener(deviceStatusTracker);
243 log.info("Started");
244 }
245
246 @Deactivate
247 public void deactivate() {
248 devicePortStats.removeListener(portStatsListener);
249 deviceDescriptions.removeListener(deviceUpdateListener);
250 portDescriptions.removeListener(portUpdateListener);
251 availableDevices.removeListener(deviceStatusTracker);
252 devicePortStats.destroy();
253 devicePortDeltaStats.destroy();
254 deviceDescriptions.destroy();
255 portDescriptions.destroy();
256 devices.clear();
257 devicePorts.clear();
258 clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
259 clusterCommunicator.removeSubscriber(PORT_INJECTED);
260 log.info("Stopped");
261 }
262
263 @Override
264 public Iterable<Device> getDevices() {
265 return devices.values();
266 }
267
268 @Override
269 public int getDeviceCount() {
270 return devices.size();
271 }
272
273 @Override
274 public Device getDevice(DeviceId deviceId) {
275 return devices.get(deviceId);
276 }
277
278 @Override
279 public DeviceEvent createOrUpdateDevice(ProviderId providerId,
280 DeviceId deviceId,
281 DeviceDescription deviceDescription) {
282 NodeId master = mastershipService.getMasterFor(deviceId);
283 if (localNodeId.equals(master)) {
284 deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
285 return refreshDeviceCache(providerId, deviceId);
286 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800287 // Only forward for ConfigProvider
288 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800289 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800290 return null;
291 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700292 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
293 return Futures.getUnchecked(
294 clusterCommunicator.sendAndReceive(deviceInjectedEvent,
295 DEVICE_INJECTED,
296 SERIALIZER::encode,
297 SERIALIZER::decode,
298 master));
299 }
300 }
301
302 private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
303 AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
304 Device device = devices.compute(deviceId, (k, existingDevice) -> {
305 Device newDevice = composeDevice(deviceId);
306 if (existingDevice == null) {
307 eventType.set(DEVICE_ADDED);
308 } else {
309 // We allow only certain attributes to trigger update
310 boolean propertiesChanged =
311 !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
312 !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
313 !Objects.equals(existingDevice.providerId(), newDevice.providerId());
314 boolean annotationsChanged =
315 !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
316
317 // Primary providers can respond to all changes, but ancillary ones
318 // should respond only to annotation changes.
319 if ((providerId.isAncillary() && annotationsChanged) ||
320 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
321 boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
322 verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
323 providerId, existingDevice, devices.get(deviceId), newDevice);
324 eventType.set(DEVICE_UPDATED);
325 }
326 }
327 return newDevice;
328 });
329 if (eventType.get() != null && !providerId.isAncillary()) {
330 markOnline(deviceId);
331 }
332 return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
333 }
334
335 /**
336 * Returns the primary providerId for a device.
337 * @param deviceId device identifier
338 * @return primary providerId
339 */
340 private Set<ProviderId> getAllProviders(DeviceId deviceId) {
341 return deviceDescriptions.keySet()
342 .stream()
343 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
344 .map(deviceKey -> deviceKey.providerId())
345 .collect(Collectors.toSet());
346 }
347
348 /**
349 * Returns the identifier for all providers for a device.
350 * @param deviceId device identifier
351 * @return set of provider identifiers
352 */
353 private ProviderId getPrimaryProviderId(DeviceId deviceId) {
354 Set<ProviderId> allProviderIds = getAllProviders(deviceId);
355 return allProviderIds.stream()
356 .filter(p -> !p.isAncillary())
357 .findFirst()
358 .orElse(Iterables.getFirst(allProviderIds, null));
359 }
360
361 /**
362 * Returns a Device, merging descriptions from multiple Providers.
363 *
364 * @param deviceId device identifier
365 * @return Device instance
366 */
367 private Device composeDevice(DeviceId deviceId) {
368
369 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
370 DeviceDescription primaryDeviceDescription =
371 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
372
373 Type type = primaryDeviceDescription.type();
374 String manufacturer = primaryDeviceDescription.manufacturer();
375 String hwVersion = primaryDeviceDescription.hwVersion();
376 String swVersion = primaryDeviceDescription.swVersion();
377 String serialNumber = primaryDeviceDescription.serialNumber();
378 ChassisId chassisId = primaryDeviceDescription.chassisId();
379 DefaultAnnotations annotations = mergeAnnotations(deviceId);
380
381 return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
382 hwVersion, swVersion, serialNumber,
383 chassisId, annotations);
384 }
385
386 private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
387 Device removedDevice = devices.remove(deviceId);
388 if (removedDevice != null) {
389 getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
390 return new DeviceEvent(DEVICE_REMOVED, removedDevice);
391 }
392 return null;
393 }
394
395 private boolean markOnline(DeviceId deviceId) {
396 return availableDevices.add(deviceId);
397 }
398
399 @Override
400 public DeviceEvent markOffline(DeviceId deviceId) {
401 availableDevices.remove(deviceId);
402 // set update listener will raise the event.
403 return null;
404 }
405
406 @Override
407 public List<DeviceEvent> updatePorts(ProviderId providerId,
408 DeviceId deviceId,
409 List<PortDescription> descriptions) {
410 NodeId master = mastershipService.getMasterFor(deviceId);
411 List<DeviceEvent> deviceEvents = null;
412 if (localNodeId.equals(master)) {
413 descriptions.forEach(description -> {
414 PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
415 portDescriptions.put(portKey, description);
416 });
417 deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
418 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800419 // Only forward for ConfigProvider
420 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800421 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta89461772016-01-26 12:18:10 -0800422 return Collections.emptyList();
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800423 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700424 if (master == null) {
425 return Collections.emptyList();
426 }
427 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
428 deviceEvents = Futures.getUnchecked(
429 clusterCommunicator.sendAndReceive(portInjectedEvent,
430 PORT_INJECTED,
431 SERIALIZER::encode,
432 SERIALIZER::decode,
433 master));
434 }
435 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
436 }
437
438 private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
439 DeviceId deviceId,
440 Optional<PortNumber> portNumber) {
441 Device device = devices.get(deviceId);
442 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
443 List<DeviceEvent> events = Lists.newArrayList();
444
445 Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
446 List<PortDescription> descriptions = Lists.newArrayList();
447 portDescriptions.entrySet().forEach(e -> {
448 PortKey key = e.getKey();
449 PortDescription value = e.getValue();
450 if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
451 if (portNumber.isPresent()) {
452 if (portNumber.get().equals(key.portNumber())) {
453 descriptions.add(value);
454 }
455 } else {
456 descriptions.add(value);
457 }
458 }
459 });
460
461 for (PortDescription description : descriptions) {
462 final PortNumber number = description.portNumber();
463 ports.compute(number, (k, existingPort) -> {
464 Port newPort = composePort(device, number);
465 if (existingPort == null) {
466 events.add(new DeviceEvent(PORT_ADDED, device, newPort));
467 } else {
468 if (existingPort.isEnabled() != newPort.isEnabled() ||
469 existingPort.type() != newPort.type() ||
470 existingPort.portSpeed() != newPort.portSpeed() ||
471 !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
472 events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
473 }
474 }
475 return newPort;
476 });
477 }
478
479 return events;
480 }
481
482 /**
483 * Returns a Port, merging descriptions from multiple Providers.
484 *
485 * @param device device the port is on
486 * @param number port number
487 * @return Port instance
488 */
489 private Port composePort(Device device, PortNumber number) {
490
491 Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
492 portDescriptions.entrySet().forEach(entry -> {
493 PortKey portKey = entry.getKey();
494 if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
495 descriptions.put(portKey.providerId(), entry.getValue());
496 }
497 });
498 ProviderId primary = getPrimaryProviderId(device.id());
499 PortDescription primaryDescription = descriptions.get(primary);
500
501 // if no primary, assume not enabled
502 boolean isEnabled = false;
503 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
504 if (primaryDescription != null) {
505 isEnabled = primaryDescription.isEnabled();
506 annotations = merge(annotations, primaryDescription.annotations());
507 }
508 Port updated = null;
509 for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
510 if (e.getKey().equals(primary)) {
511 continue;
512 }
513 annotations = merge(annotations, e.getValue().annotations());
514 updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
515 }
516 if (primaryDescription == null) {
517 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
518 }
519 return updated == null
520 ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
521 : updated;
522 }
523
524 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
525 PortDescription description, Annotations annotations) {
HIGUCHI Yuta34a3f692016-01-09 21:08:57 -0800526 // FIXME this switch need to go away once all ports are done.
Madan Jampanifc8aced2015-08-27 11:06:12 -0700527 switch (description.type()) {
528 case OMS:
HIGUCHI Yuta95d83e82016-04-26 12:13:48 -0700529 if (description instanceof OmsPortDescription) {
530 // remove if-block once deprecation is complete
531 OmsPortDescription omsDesc = (OmsPortDescription) description;
532 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
533 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
534 }
535 // same as default
536 return new DefaultPort(device, number, isEnabled, description.type(),
537 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700538 case OCH:
HIGUCHI Yuta34a3f692016-01-09 21:08:57 -0800539 if (description instanceof OchPortDescription) {
540 // remove if-block once Och deprecation is complete
541 OchPortDescription ochDesc = (OchPortDescription) description;
542 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
543 ochDesc.isTunable(), ochDesc.lambda(), annotations);
544 }
545 return new DefaultPort(device, number, isEnabled, description.type(),
546 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700547 case ODUCLT:
HIGUCHI Yuta4c0ef6b2016-05-02 19:45:41 -0700548 if (description instanceof OduCltPortDescription) {
549 // remove if-block once deprecation is complete
550 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
551 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
552 }
553 // same as default
554 return new DefaultPort(device, number, isEnabled, description.type(),
555 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700556 default:
557 return new DefaultPort(device, number, isEnabled, description.type(),
558 description.portSpeed(), annotations);
559 }
560 }
561
562 @Override
563 public DeviceEvent updatePortStatus(ProviderId providerId,
564 DeviceId deviceId,
565 PortDescription portDescription) {
566 portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
567 List<DeviceEvent> events =
568 refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
569 return Iterables.getFirst(events, null);
570 }
571
572 @Override
573 public List<Port> getPorts(DeviceId deviceId) {
574 return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
575 }
576
577 @Override
578 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
579 return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
580 }
581
582 @Override
583 public DeviceEvent updatePortStatistics(ProviderId providerId,
584 DeviceId deviceId,
585 Collection<PortStatistics> newStatsCollection) {
586
587 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
588 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
589 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
590
591 if (prvStatsMap != null) {
592 for (PortStatistics newStats : newStatsCollection) {
593 PortNumber port = PortNumber.portNumber(newStats.port());
594 PortStatistics prvStats = prvStatsMap.get(port);
595 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
596 PortStatistics deltaStats = builder.build();
597 if (prvStats != null) {
598 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
599 }
600 deltaStatsMap.put(port, deltaStats);
601 newStatsMap.put(port, newStats);
602 }
603 } else {
604 for (PortStatistics newStats : newStatsCollection) {
605 PortNumber port = PortNumber.portNumber(newStats.port());
606 newStatsMap.put(port, newStats);
607 }
608 }
609 devicePortDeltaStats.put(deviceId, deltaStatsMap);
610 devicePortStats.put(deviceId, newStatsMap);
611 // DeviceEvent returns null because of InternalPortStatsListener usage
612 return null;
613 }
614
615 /**
616 * Calculate delta statistics by subtracting previous from new statistics.
617 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700618 * @param deviceId device indentifier
619 * @param prvStats previous port statistics
620 * @param newStats new port statistics
Madan Jampanifc8aced2015-08-27 11:06:12 -0700621 * @return PortStatistics
622 */
623 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
624 // calculate time difference
625 long deltaStatsSec, deltaStatsNano;
626 if (newStats.durationNano() < prvStats.durationNano()) {
627 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
628 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
629 } else {
630 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
631 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
632 }
633 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
634 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
635 .setPort(newStats.port())
636 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
637 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
638 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
639 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
640 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
641 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
642 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
643 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
644 .setDurationSec(deltaStatsSec)
645 .setDurationNano(deltaStatsNano)
646 .build();
647 return deltaStats;
648 }
649
650 @Override
651 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
652 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
653 if (portStats == null) {
654 return Collections.emptyList();
655 }
656 return ImmutableList.copyOf(portStats.values());
657 }
658
659 @Override
660 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
661 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
662 if (portStats == null) {
663 return Collections.emptyList();
664 }
665 return ImmutableList.copyOf(portStats.values());
666 }
667
668 @Override
669 public boolean isAvailable(DeviceId deviceId) {
670 return availableDevices.contains(deviceId);
671 }
672
673 @Override
674 public Iterable<Device> getAvailableDevices() {
675 return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
676 }
677
678 @Override
679 public DeviceEvent removeDevice(DeviceId deviceId) {
680 NodeId master = mastershipService.getMasterFor(deviceId);
681 // if there exist a master, forward
682 // if there is no master, try to become one and process
683 boolean relinquishAtEnd = false;
684 if (master == null) {
685 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
686 if (myRole != MastershipRole.NONE) {
687 relinquishAtEnd = true;
688 }
689 log.debug("Temporarily requesting role for {} to remove", deviceId);
690 MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
691 if (role == MastershipRole.MASTER) {
692 master = localNodeId;
693 }
694 }
695
696 if (!localNodeId.equals(master)) {
697 log.debug("{} has control of {}, forwarding remove request",
698 master, deviceId);
699
700 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
701 .whenComplete((r, e) -> {
702 if (e != null) {
703 log.error("Failed to forward {} remove request to its master", deviceId, e);
704 }
705 });
706 return null;
707 }
708
709 // I have control..
710 DeviceEvent event = null;
711 final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
712 DeviceDescription removedDeviceDescription =
713 deviceDescriptions.remove(deviceKey);
714 if (removedDeviceDescription != null) {
715 event = purgeDeviceCache(deviceId);
716 }
717
718 if (relinquishAtEnd) {
719 log.debug("Relinquishing temporary role acquired for {}", deviceId);
720 mastershipService.relinquishMastership(deviceId);
721 }
722 return event;
723 }
724
725 private DeviceEvent injectDevice(DeviceInjectedEvent event) {
726 return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
727 }
728
729 private List<DeviceEvent> injectPort(PortInjectedEvent event) {
730 return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
731 }
732
733 private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
734 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
735 DeviceDescription primaryDeviceDescription =
736 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
737 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
738 annotations = merge(annotations, primaryDeviceDescription.annotations());
739 for (ProviderId providerId : getAllProviders(deviceId)) {
740 if (!providerId.equals(primaryProviderId)) {
741 annotations = merge(annotations,
742 deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
743 }
744 }
745 return annotations;
746 }
747
748 private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
749 @Override
750 public void event(SetEvent<DeviceId> event) {
751 final DeviceId deviceId = event.entry();
752 final Device device = devices.get(deviceId);
753 if (device != null) {
754 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
755 } else {
756 pendingAvailableChangeUpdates.add(deviceId);
757 }
758 }
759 }
760
761 private class InternalDeviceChangeEventListener
762 implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
763 @Override
764 public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
765 DeviceId deviceId = event.key().deviceId();
766 ProviderId providerId = event.key().providerId();
767 if (event.type() == PUT) {
768 notifyDelegate(refreshDeviceCache(providerId, deviceId));
769 if (pendingAvailableChangeUpdates.remove(deviceId)) {
770 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
771 }
772 } else if (event.type() == REMOVE) {
773 notifyDelegate(purgeDeviceCache(deviceId));
774 }
775 }
776 }
777
778 private class InternalPortChangeEventListener
779 implements EventuallyConsistentMapListener<PortKey, PortDescription> {
780 @Override
781 public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
782 DeviceId deviceId = event.key().deviceId();
783 ProviderId providerId = event.key().providerId();
784 PortNumber portNumber = event.key().portNumber();
785 if (event.type() == PUT) {
786 if (devices.containsKey(deviceId)) {
787 List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
788 for (DeviceEvent deviceEvent : events) {
789 notifyDelegate(deviceEvent);
790 }
791 }
792 } else if (event.type() == REMOVE) {
793 log.warn("Unexpected port removed event");
794 }
795 }
796 }
797
798 private class InternalPortStatsListener
799 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
800 @Override
801 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
802 if (event.type() == PUT) {
803 Device device = devices.get(event.key());
804 if (device != null) {
805 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
806 }
807 }
808 }
809 }
810}