blob: c291b4b0927d1f07f8f1b2c125f7e5fc45843f63 [file] [log] [blame]
Madan Jampanifc8aced2015-08-27 11:06:12 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Madan Jampanifc8aced2015-08-27 11:06:12 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.device.impl;
17
Jian Li11599162016-01-15 15:46:16 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import com.google.common.util.concurrent.Futures;
Madan Jampanifc8aced2015-08-27 11:06:12 -070024import org.apache.felix.scr.annotations.Activate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070025import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.KryoNamespace;
31import org.onlab.util.SharedExecutors;
32import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.mastership.MastershipService;
35import org.onosproject.mastership.MastershipTermService;
36import org.onosproject.net.Annotations;
37import org.onosproject.net.AnnotationsUtil;
38import org.onosproject.net.DefaultAnnotations;
39import org.onosproject.net.DefaultDevice;
40import org.onosproject.net.DefaultPort;
41import org.onosproject.net.Device;
Jian Li11599162016-01-15 15:46:16 -080042import org.onosproject.net.Device.Type;
Madan Jampanifc8aced2015-08-27 11:06:12 -070043import org.onosproject.net.DeviceId;
44import org.onosproject.net.MastershipRole;
45import org.onosproject.net.OchPort;
46import org.onosproject.net.OduCltPort;
47import org.onosproject.net.OmsPort;
48import org.onosproject.net.Port;
49import org.onosproject.net.PortNumber;
Madan Jampanifc8aced2015-08-27 11:06:12 -070050import org.onosproject.net.device.DefaultPortStatistics;
51import org.onosproject.net.device.DeviceClockService;
52import org.onosproject.net.device.DeviceDescription;
53import org.onosproject.net.device.DeviceEvent;
54import org.onosproject.net.device.DeviceStore;
55import org.onosproject.net.device.DeviceStoreDelegate;
56import org.onosproject.net.device.OchPortDescription;
57import org.onosproject.net.device.OduCltPortDescription;
58import org.onosproject.net.device.OmsPortDescription;
59import org.onosproject.net.device.PortDescription;
60import org.onosproject.net.device.PortStatistics;
61import org.onosproject.net.provider.ProviderId;
62import org.onosproject.store.AbstractStore;
63import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
64import org.onosproject.store.impl.MastershipBasedTimestamp;
65import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070066import org.onosproject.store.serializers.StoreSerializer;
Madan Jampanifc8aced2015-08-27 11:06:12 -070067import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
68import org.onosproject.store.service.DistributedSet;
69import org.onosproject.store.service.EventuallyConsistentMap;
70import org.onosproject.store.service.EventuallyConsistentMapEvent;
Jian Li11599162016-01-15 15:46:16 -080071import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070072import org.onosproject.store.service.Serializer;
73import org.onosproject.store.service.SetEvent;
74import org.onosproject.store.service.SetEventListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070075import org.onosproject.store.service.StorageService;
Jian Li11599162016-01-15 15:46:16 -080076import org.onosproject.store.service.WallClockTimestamp;
Madan Jampanifc8aced2015-08-27 11:06:12 -070077import org.slf4j.Logger;
78
Jian Li11599162016-01-15 15:46:16 -080079import java.util.Collection;
80import java.util.Collections;
81import java.util.List;
82import java.util.Map;
83import java.util.Map.Entry;
84import java.util.Objects;
85import java.util.Optional;
86import java.util.Set;
87import java.util.concurrent.TimeUnit;
88import java.util.concurrent.atomic.AtomicReference;
89import java.util.stream.Collectors;
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -070090import java.util.stream.Stream;
Jian Li11599162016-01-15 15:46:16 -080091
92import static com.google.common.base.Preconditions.checkArgument;
93import static com.google.common.base.Verify.verify;
94import static org.onosproject.net.DefaultAnnotations.merge;
95import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
96import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
97import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
98import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
99import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
100import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
101import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700102import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
103import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
104import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
Jian Li11599162016-01-15 15:46:16 -0800105import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
106import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
107import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700108
109/**
110 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
111 */
Jian Li11599162016-01-15 15:46:16 -0800112//@Component(immediate = true, enabled = false)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700113@Service
114public class ECDeviceStore
115 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
116 implements DeviceStore {
117
118 private final Logger log = getLogger(getClass());
119
120 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
121
122 private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
123 private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
124 Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
125
126 private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
127 private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
128 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
129 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
130
131 private DistributedSet<DeviceId> availableDevices;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected StorageService storageService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected MastershipService mastershipService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected MastershipTermService mastershipTermService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected DeviceClockService deviceClockService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
146 protected ClusterCommunicationService clusterCommunicator;
147
148 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
149 protected ClusterService clusterService;
150
151 private NodeId localNodeId;
152 private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
153 new InternalDeviceChangeEventListener();
154 private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
155 new InternalPortChangeEventListener();
156 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
157 new InternalPortStatsListener();
158 private final SetEventListener<DeviceId> deviceStatusTracker =
159 new InternalDeviceStatusTracker();
160
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700161 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
162 KryoNamespace.newBuilder()
Madan Jampanifc8aced2015-08-27 11:06:12 -0700163 .register(DistributedStoreSerializers.STORE_COMMON)
164 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
165 .register(DeviceInjectedEvent.class)
166 .register(PortInjectedEvent.class)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700167 .build("ECDevice"));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700168
169 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
170 .register(KryoNamespaces.API)
171 .register(DeviceKey.class)
172 .register(PortKey.class)
173 .register(DeviceKey.class)
174 .register(PortKey.class)
175 .register(MastershipBasedTimestamp.class);
176
177 @Activate
178 public void activate() {
179 localNodeId = clusterService.getLocalNode().id();
180
181 deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
182 .withName("onos-device-descriptions")
183 .withSerializer(SERIALIZER_BUILDER)
184 .withTimestampProvider((k, v) -> {
185 try {
186 return deviceClockService.getTimestamp(k.deviceId());
187 } catch (IllegalStateException e) {
188 return null;
189 }
190 }).build();
191
192 portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
193 .withName("onos-port-descriptions")
194 .withSerializer(SERIALIZER_BUILDER)
195 .withTimestampProvider((k, v) -> {
196 try {
197 return deviceClockService.getTimestamp(k.deviceId());
198 } catch (IllegalStateException e) {
199 return null;
200 }
201 }).build();
202
203 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
204 .withName("onos-port-stats")
205 .withSerializer(SERIALIZER_BUILDER)
206 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
207 .withTimestampProvider((k, v) -> new WallClockTimestamp())
208 .withTombstonesDisabled()
209 .build();
210
211 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
212 eventuallyConsistentMapBuilder()
213 .withName("onos-port-stats-delta")
214 .withSerializer(SERIALIZER_BUILDER)
215 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
216 .withTimestampProvider((k, v) -> new WallClockTimestamp())
217 .withTombstonesDisabled()
218 .build();
219
220 clusterCommunicator.addSubscriber(DEVICE_INJECTED,
221 SERIALIZER::decode,
222 this::injectDevice,
223 SERIALIZER::encode,
224 SharedExecutors.getPoolThreadExecutor());
225
226 clusterCommunicator.addSubscriber(PORT_INJECTED,
227 SERIALIZER::decode,
228 this::injectPort,
229 SERIALIZER::encode,
230 SharedExecutors.getPoolThreadExecutor());
231
232 availableDevices = storageService.<DeviceId>setBuilder()
233 .withName("onos-online-devices")
234 .withSerializer(Serializer.using(KryoNamespaces.API))
Madan Jampanifc8aced2015-08-27 11:06:12 -0700235 .withRelaxedReadConsistency()
Madan Jampani538be742016-02-10 14:55:38 -0800236 .build()
237 .asDistributedSet();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700238
239 deviceDescriptions.addListener(deviceUpdateListener);
240 portDescriptions.addListener(portUpdateListener);
241 devicePortStats.addListener(portStatsListener);
242 availableDevices.addListener(deviceStatusTracker);
243 log.info("Started");
244 }
245
246 @Deactivate
247 public void deactivate() {
248 devicePortStats.removeListener(portStatsListener);
249 deviceDescriptions.removeListener(deviceUpdateListener);
250 portDescriptions.removeListener(portUpdateListener);
251 availableDevices.removeListener(deviceStatusTracker);
252 devicePortStats.destroy();
253 devicePortDeltaStats.destroy();
254 deviceDescriptions.destroy();
255 portDescriptions.destroy();
256 devices.clear();
257 devicePorts.clear();
258 clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
259 clusterCommunicator.removeSubscriber(PORT_INJECTED);
260 log.info("Stopped");
261 }
262
263 @Override
264 public Iterable<Device> getDevices() {
265 return devices.values();
266 }
267
268 @Override
269 public int getDeviceCount() {
270 return devices.size();
271 }
272
273 @Override
274 public Device getDevice(DeviceId deviceId) {
275 return devices.get(deviceId);
276 }
277
helenyrwufd296b62016-06-22 17:43:02 -0700278 // FIXME handle deviceDescription.isDefaultAvailable()=false case properly.
Madan Jampanifc8aced2015-08-27 11:06:12 -0700279 @Override
280 public DeviceEvent createOrUpdateDevice(ProviderId providerId,
281 DeviceId deviceId,
282 DeviceDescription deviceDescription) {
283 NodeId master = mastershipService.getMasterFor(deviceId);
284 if (localNodeId.equals(master)) {
285 deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
286 return refreshDeviceCache(providerId, deviceId);
287 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800288 // Only forward for ConfigProvider
289 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800290 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800291 return null;
292 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700293 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
294 return Futures.getUnchecked(
295 clusterCommunicator.sendAndReceive(deviceInjectedEvent,
296 DEVICE_INJECTED,
297 SERIALIZER::encode,
298 SERIALIZER::decode,
299 master));
300 }
301 }
302
303 private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
304 AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
305 Device device = devices.compute(deviceId, (k, existingDevice) -> {
306 Device newDevice = composeDevice(deviceId);
307 if (existingDevice == null) {
308 eventType.set(DEVICE_ADDED);
309 } else {
310 // We allow only certain attributes to trigger update
311 boolean propertiesChanged =
312 !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
313 !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
314 !Objects.equals(existingDevice.providerId(), newDevice.providerId());
315 boolean annotationsChanged =
316 !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
317
318 // Primary providers can respond to all changes, but ancillary ones
319 // should respond only to annotation changes.
320 if ((providerId.isAncillary() && annotationsChanged) ||
321 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
322 boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
323 verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
324 providerId, existingDevice, devices.get(deviceId), newDevice);
325 eventType.set(DEVICE_UPDATED);
326 }
327 }
328 return newDevice;
329 });
330 if (eventType.get() != null && !providerId.isAncillary()) {
331 markOnline(deviceId);
332 }
333 return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
334 }
335
336 /**
337 * Returns the primary providerId for a device.
338 * @param deviceId device identifier
339 * @return primary providerId
340 */
341 private Set<ProviderId> getAllProviders(DeviceId deviceId) {
342 return deviceDescriptions.keySet()
343 .stream()
344 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
345 .map(deviceKey -> deviceKey.providerId())
346 .collect(Collectors.toSet());
347 }
348
349 /**
350 * Returns the identifier for all providers for a device.
351 * @param deviceId device identifier
352 * @return set of provider identifiers
353 */
354 private ProviderId getPrimaryProviderId(DeviceId deviceId) {
355 Set<ProviderId> allProviderIds = getAllProviders(deviceId);
356 return allProviderIds.stream()
357 .filter(p -> !p.isAncillary())
358 .findFirst()
359 .orElse(Iterables.getFirst(allProviderIds, null));
360 }
361
362 /**
363 * Returns a Device, merging descriptions from multiple Providers.
364 *
365 * @param deviceId device identifier
366 * @return Device instance
367 */
368 private Device composeDevice(DeviceId deviceId) {
369
370 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
371 DeviceDescription primaryDeviceDescription =
372 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
373
374 Type type = primaryDeviceDescription.type();
375 String manufacturer = primaryDeviceDescription.manufacturer();
376 String hwVersion = primaryDeviceDescription.hwVersion();
377 String swVersion = primaryDeviceDescription.swVersion();
378 String serialNumber = primaryDeviceDescription.serialNumber();
379 ChassisId chassisId = primaryDeviceDescription.chassisId();
380 DefaultAnnotations annotations = mergeAnnotations(deviceId);
381
382 return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
383 hwVersion, swVersion, serialNumber,
384 chassisId, annotations);
385 }
386
387 private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
388 Device removedDevice = devices.remove(deviceId);
389 if (removedDevice != null) {
390 getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
391 return new DeviceEvent(DEVICE_REMOVED, removedDevice);
392 }
393 return null;
394 }
395
helenyrwufd296b62016-06-22 17:43:02 -0700396 // FIXME publicization of markOnline -- trigger some action independently?
397 public boolean markOnline(DeviceId deviceId) {
398 if (devices.containsKey(deviceId)) {
399 return availableDevices.add(deviceId);
400 }
401 log.warn("Device {} does not exist in store", deviceId);
402 return false;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700403 }
404
405 @Override
406 public DeviceEvent markOffline(DeviceId deviceId) {
407 availableDevices.remove(deviceId);
408 // set update listener will raise the event.
409 return null;
410 }
411
412 @Override
413 public List<DeviceEvent> updatePorts(ProviderId providerId,
414 DeviceId deviceId,
415 List<PortDescription> descriptions) {
416 NodeId master = mastershipService.getMasterFor(deviceId);
417 List<DeviceEvent> deviceEvents = null;
418 if (localNodeId.equals(master)) {
419 descriptions.forEach(description -> {
420 PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
421 portDescriptions.put(portKey, description);
422 });
423 deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
424 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800425 // Only forward for ConfigProvider
426 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800427 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta89461772016-01-26 12:18:10 -0800428 return Collections.emptyList();
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800429 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700430 if (master == null) {
431 return Collections.emptyList();
432 }
433 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
434 deviceEvents = Futures.getUnchecked(
435 clusterCommunicator.sendAndReceive(portInjectedEvent,
436 PORT_INJECTED,
437 SERIALIZER::encode,
438 SERIALIZER::decode,
439 master));
440 }
441 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
442 }
443
444 private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
445 DeviceId deviceId,
446 Optional<PortNumber> portNumber) {
447 Device device = devices.get(deviceId);
448 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
449 List<DeviceEvent> events = Lists.newArrayList();
450
451 Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
452 List<PortDescription> descriptions = Lists.newArrayList();
453 portDescriptions.entrySet().forEach(e -> {
454 PortKey key = e.getKey();
455 PortDescription value = e.getValue();
456 if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
457 if (portNumber.isPresent()) {
458 if (portNumber.get().equals(key.portNumber())) {
459 descriptions.add(value);
460 }
461 } else {
462 descriptions.add(value);
463 }
464 }
465 });
466
467 for (PortDescription description : descriptions) {
468 final PortNumber number = description.portNumber();
469 ports.compute(number, (k, existingPort) -> {
470 Port newPort = composePort(device, number);
471 if (existingPort == null) {
472 events.add(new DeviceEvent(PORT_ADDED, device, newPort));
473 } else {
474 if (existingPort.isEnabled() != newPort.isEnabled() ||
475 existingPort.type() != newPort.type() ||
476 existingPort.portSpeed() != newPort.portSpeed() ||
477 !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
478 events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
479 }
480 }
481 return newPort;
482 });
483 }
484
485 return events;
486 }
487
488 /**
489 * Returns a Port, merging descriptions from multiple Providers.
490 *
491 * @param device device the port is on
492 * @param number port number
493 * @return Port instance
494 */
495 private Port composePort(Device device, PortNumber number) {
496
497 Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
498 portDescriptions.entrySet().forEach(entry -> {
499 PortKey portKey = entry.getKey();
500 if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
501 descriptions.put(portKey.providerId(), entry.getValue());
502 }
503 });
504 ProviderId primary = getPrimaryProviderId(device.id());
505 PortDescription primaryDescription = descriptions.get(primary);
506
507 // if no primary, assume not enabled
508 boolean isEnabled = false;
509 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
510 if (primaryDescription != null) {
511 isEnabled = primaryDescription.isEnabled();
512 annotations = merge(annotations, primaryDescription.annotations());
513 }
514 Port updated = null;
515 for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
516 if (e.getKey().equals(primary)) {
517 continue;
518 }
519 annotations = merge(annotations, e.getValue().annotations());
520 updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
521 }
522 if (primaryDescription == null) {
523 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
524 }
525 return updated == null
526 ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
527 : updated;
528 }
529
530 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
531 PortDescription description, Annotations annotations) {
HIGUCHI Yuta34a3f692016-01-09 21:08:57 -0800532 // FIXME this switch need to go away once all ports are done.
Madan Jampanifc8aced2015-08-27 11:06:12 -0700533 switch (description.type()) {
534 case OMS:
HIGUCHI Yuta95d83e82016-04-26 12:13:48 -0700535 if (description instanceof OmsPortDescription) {
536 // remove if-block once deprecation is complete
537 OmsPortDescription omsDesc = (OmsPortDescription) description;
538 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
539 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
540 }
541 // same as default
542 return new DefaultPort(device, number, isEnabled, description.type(),
543 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700544 case OCH:
HIGUCHI Yuta34a3f692016-01-09 21:08:57 -0800545 if (description instanceof OchPortDescription) {
546 // remove if-block once Och deprecation is complete
547 OchPortDescription ochDesc = (OchPortDescription) description;
548 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
549 ochDesc.isTunable(), ochDesc.lambda(), annotations);
550 }
551 return new DefaultPort(device, number, isEnabled, description.type(),
552 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700553 case ODUCLT:
HIGUCHI Yuta4c0ef6b2016-05-02 19:45:41 -0700554 if (description instanceof OduCltPortDescription) {
555 // remove if-block once deprecation is complete
556 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
557 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
558 }
559 // same as default
560 return new DefaultPort(device, number, isEnabled, description.type(),
561 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700562 default:
563 return new DefaultPort(device, number, isEnabled, description.type(),
564 description.portSpeed(), annotations);
565 }
566 }
567
568 @Override
569 public DeviceEvent updatePortStatus(ProviderId providerId,
570 DeviceId deviceId,
571 PortDescription portDescription) {
572 portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
573 List<DeviceEvent> events =
574 refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
575 return Iterables.getFirst(events, null);
576 }
577
578 @Override
579 public List<Port> getPorts(DeviceId deviceId) {
580 return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
581 }
582
583 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700584 public Stream<PortDescription> getPortDescriptions(ProviderId pid,
585 DeviceId deviceId) {
586
587 return portDescriptions.entrySet().stream()
588 .filter(e -> e.getKey().providerId().equals(pid))
589 .map(Map.Entry::getValue);
590 }
591
592 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700593 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
594 return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
595 }
596
597 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700598 public PortDescription getPortDescription(ProviderId pid,
599 DeviceId deviceId,
600 PortNumber portNumber) {
601 return portDescriptions.get(new PortKey(pid, deviceId, portNumber));
602 }
603
604 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700605 public DeviceEvent updatePortStatistics(ProviderId providerId,
606 DeviceId deviceId,
607 Collection<PortStatistics> newStatsCollection) {
608
609 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
610 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
611 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
612
613 if (prvStatsMap != null) {
614 for (PortStatistics newStats : newStatsCollection) {
615 PortNumber port = PortNumber.portNumber(newStats.port());
616 PortStatistics prvStats = prvStatsMap.get(port);
617 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
618 PortStatistics deltaStats = builder.build();
619 if (prvStats != null) {
620 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
621 }
622 deltaStatsMap.put(port, deltaStats);
623 newStatsMap.put(port, newStats);
624 }
625 } else {
626 for (PortStatistics newStats : newStatsCollection) {
627 PortNumber port = PortNumber.portNumber(newStats.port());
628 newStatsMap.put(port, newStats);
629 }
630 }
631 devicePortDeltaStats.put(deviceId, deltaStatsMap);
632 devicePortStats.put(deviceId, newStatsMap);
633 // DeviceEvent returns null because of InternalPortStatsListener usage
634 return null;
635 }
636
637 /**
638 * Calculate delta statistics by subtracting previous from new statistics.
639 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700640 * @param deviceId device indentifier
641 * @param prvStats previous port statistics
642 * @param newStats new port statistics
Madan Jampanifc8aced2015-08-27 11:06:12 -0700643 * @return PortStatistics
644 */
645 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
646 // calculate time difference
647 long deltaStatsSec, deltaStatsNano;
648 if (newStats.durationNano() < prvStats.durationNano()) {
649 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
650 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
651 } else {
652 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
653 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
654 }
655 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
656 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
657 .setPort(newStats.port())
658 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
659 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
660 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
661 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
662 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
663 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
664 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
665 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
666 .setDurationSec(deltaStatsSec)
667 .setDurationNano(deltaStatsNano)
668 .build();
669 return deltaStats;
670 }
671
672 @Override
673 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
674 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
675 if (portStats == null) {
676 return Collections.emptyList();
677 }
678 return ImmutableList.copyOf(portStats.values());
679 }
680
681 @Override
682 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
683 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
684 if (portStats == null) {
685 return Collections.emptyList();
686 }
687 return ImmutableList.copyOf(portStats.values());
688 }
689
690 @Override
691 public boolean isAvailable(DeviceId deviceId) {
692 return availableDevices.contains(deviceId);
693 }
694
695 @Override
696 public Iterable<Device> getAvailableDevices() {
697 return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
698 }
699
700 @Override
701 public DeviceEvent removeDevice(DeviceId deviceId) {
702 NodeId master = mastershipService.getMasterFor(deviceId);
703 // if there exist a master, forward
704 // if there is no master, try to become one and process
705 boolean relinquishAtEnd = false;
706 if (master == null) {
707 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
708 if (myRole != MastershipRole.NONE) {
709 relinquishAtEnd = true;
710 }
711 log.debug("Temporarily requesting role for {} to remove", deviceId);
712 MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
713 if (role == MastershipRole.MASTER) {
714 master = localNodeId;
715 }
716 }
717
718 if (!localNodeId.equals(master)) {
719 log.debug("{} has control of {}, forwarding remove request",
720 master, deviceId);
721
722 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
723 .whenComplete((r, e) -> {
724 if (e != null) {
725 log.error("Failed to forward {} remove request to its master", deviceId, e);
726 }
727 });
728 return null;
729 }
730
731 // I have control..
732 DeviceEvent event = null;
733 final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
734 DeviceDescription removedDeviceDescription =
735 deviceDescriptions.remove(deviceKey);
736 if (removedDeviceDescription != null) {
737 event = purgeDeviceCache(deviceId);
738 }
739
740 if (relinquishAtEnd) {
741 log.debug("Relinquishing temporary role acquired for {}", deviceId);
742 mastershipService.relinquishMastership(deviceId);
743 }
744 return event;
745 }
746
747 private DeviceEvent injectDevice(DeviceInjectedEvent event) {
748 return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
749 }
750
751 private List<DeviceEvent> injectPort(PortInjectedEvent event) {
752 return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
753 }
754
755 private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
756 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
757 DeviceDescription primaryDeviceDescription =
758 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
759 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
760 annotations = merge(annotations, primaryDeviceDescription.annotations());
761 for (ProviderId providerId : getAllProviders(deviceId)) {
762 if (!providerId.equals(primaryProviderId)) {
763 annotations = merge(annotations,
764 deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
765 }
766 }
767 return annotations;
768 }
769
770 private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
771 @Override
772 public void event(SetEvent<DeviceId> event) {
773 final DeviceId deviceId = event.entry();
774 final Device device = devices.get(deviceId);
775 if (device != null) {
776 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
777 } else {
778 pendingAvailableChangeUpdates.add(deviceId);
779 }
780 }
781 }
782
783 private class InternalDeviceChangeEventListener
784 implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
785 @Override
786 public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
787 DeviceId deviceId = event.key().deviceId();
788 ProviderId providerId = event.key().providerId();
789 if (event.type() == PUT) {
790 notifyDelegate(refreshDeviceCache(providerId, deviceId));
791 if (pendingAvailableChangeUpdates.remove(deviceId)) {
792 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
793 }
794 } else if (event.type() == REMOVE) {
795 notifyDelegate(purgeDeviceCache(deviceId));
796 }
797 }
798 }
799
800 private class InternalPortChangeEventListener
801 implements EventuallyConsistentMapListener<PortKey, PortDescription> {
802 @Override
803 public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
804 DeviceId deviceId = event.key().deviceId();
805 ProviderId providerId = event.key().providerId();
806 PortNumber portNumber = event.key().portNumber();
807 if (event.type() == PUT) {
808 if (devices.containsKey(deviceId)) {
809 List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
810 for (DeviceEvent deviceEvent : events) {
811 notifyDelegate(deviceEvent);
812 }
813 }
814 } else if (event.type() == REMOVE) {
815 log.warn("Unexpected port removed event");
816 }
817 }
818 }
819
820 private class InternalPortStatsListener
821 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
822 @Override
823 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
824 if (event.type() == PUT) {
825 Device device = devices.get(event.key());
826 if (device != null) {
827 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
828 }
829 }
830 }
831 }
832}