blob: 30fae0bc51f2ba905570d1bdaffd415c884ffb2a [file] [log] [blame]
Madan Jampanifc8aced2015-08-27 11:06:12 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Madan Jampanifc8aced2015-08-27 11:06:12 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.device.impl;
17
Jian Li11599162016-01-15 15:46:16 -080018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import com.google.common.util.concurrent.Futures;
Madan Jampanifc8aced2015-08-27 11:06:12 -070024import org.apache.felix.scr.annotations.Activate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070025import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
29import org.onlab.packet.ChassisId;
30import org.onlab.util.KryoNamespace;
31import org.onlab.util.SharedExecutors;
32import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.mastership.MastershipService;
35import org.onosproject.mastership.MastershipTermService;
36import org.onosproject.net.Annotations;
37import org.onosproject.net.AnnotationsUtil;
38import org.onosproject.net.DefaultAnnotations;
39import org.onosproject.net.DefaultDevice;
40import org.onosproject.net.DefaultPort;
41import org.onosproject.net.Device;
Jian Li11599162016-01-15 15:46:16 -080042import org.onosproject.net.Device.Type;
Madan Jampanifc8aced2015-08-27 11:06:12 -070043import org.onosproject.net.DeviceId;
44import org.onosproject.net.MastershipRole;
Madan Jampanifc8aced2015-08-27 11:06:12 -070045import org.onosproject.net.Port;
46import org.onosproject.net.PortNumber;
Madan Jampanifc8aced2015-08-27 11:06:12 -070047import org.onosproject.net.device.DefaultPortStatistics;
48import org.onosproject.net.device.DeviceClockService;
49import org.onosproject.net.device.DeviceDescription;
50import org.onosproject.net.device.DeviceEvent;
51import org.onosproject.net.device.DeviceStore;
52import org.onosproject.net.device.DeviceStoreDelegate;
Madan Jampanifc8aced2015-08-27 11:06:12 -070053import org.onosproject.net.device.PortDescription;
54import org.onosproject.net.device.PortStatistics;
55import org.onosproject.net.provider.ProviderId;
56import org.onosproject.store.AbstractStore;
57import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
58import org.onosproject.store.impl.MastershipBasedTimestamp;
59import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070060import org.onosproject.store.serializers.StoreSerializer;
Madan Jampanifc8aced2015-08-27 11:06:12 -070061import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
62import org.onosproject.store.service.DistributedSet;
63import org.onosproject.store.service.EventuallyConsistentMap;
64import org.onosproject.store.service.EventuallyConsistentMapEvent;
Jian Li11599162016-01-15 15:46:16 -080065import org.onosproject.store.service.EventuallyConsistentMapListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070066import org.onosproject.store.service.Serializer;
67import org.onosproject.store.service.SetEvent;
68import org.onosproject.store.service.SetEventListener;
Madan Jampanifc8aced2015-08-27 11:06:12 -070069import org.onosproject.store.service.StorageService;
Jian Li11599162016-01-15 15:46:16 -080070import org.onosproject.store.service.WallClockTimestamp;
Madan Jampanifc8aced2015-08-27 11:06:12 -070071import org.slf4j.Logger;
72
Jian Li11599162016-01-15 15:46:16 -080073import java.util.Collection;
74import java.util.Collections;
75import java.util.List;
76import java.util.Map;
77import java.util.Map.Entry;
78import java.util.Objects;
79import java.util.Optional;
80import java.util.Set;
81import java.util.concurrent.TimeUnit;
82import java.util.concurrent.atomic.AtomicReference;
83import java.util.stream.Collectors;
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -070084import java.util.stream.Stream;
Jian Li11599162016-01-15 15:46:16 -080085
86import static com.google.common.base.Preconditions.checkArgument;
87import static com.google.common.base.Verify.verify;
88import static org.onosproject.net.DefaultAnnotations.merge;
89import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
90import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
91import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
92import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
93import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
94import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
95import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
Madan Jampanifc8aced2015-08-27 11:06:12 -070096import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
97import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
98import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
Jian Li11599162016-01-15 15:46:16 -080099import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
100import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
101import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700102
103/**
104 * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
105 */
Jian Li11599162016-01-15 15:46:16 -0800106//@Component(immediate = true, enabled = false)
Madan Jampanifc8aced2015-08-27 11:06:12 -0700107@Service
108public class ECDeviceStore
109 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
110 implements DeviceStore {
111
112 private final Logger log = getLogger(getClass());
113
114 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
115
116 private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
117 private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
118 Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
119
120 private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
121 private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
122 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
123 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
124
125 private DistributedSet<DeviceId> availableDevices;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected StorageService storageService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected MastershipService mastershipService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected MastershipTermService mastershipTermService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected DeviceClockService deviceClockService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected ClusterCommunicationService clusterCommunicator;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected ClusterService clusterService;
144
145 private NodeId localNodeId;
146 private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
147 new InternalDeviceChangeEventListener();
148 private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
149 new InternalPortChangeEventListener();
150 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
151 new InternalPortStatsListener();
152 private final SetEventListener<DeviceId> deviceStatusTracker =
153 new InternalDeviceStatusTracker();
154
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700155 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
156 KryoNamespace.newBuilder()
Madan Jampanifc8aced2015-08-27 11:06:12 -0700157 .register(DistributedStoreSerializers.STORE_COMMON)
158 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
159 .register(DeviceInjectedEvent.class)
160 .register(PortInjectedEvent.class)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700161 .build("ECDevice"));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700162
163 protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
164 .register(KryoNamespaces.API)
165 .register(DeviceKey.class)
166 .register(PortKey.class)
167 .register(DeviceKey.class)
168 .register(PortKey.class)
169 .register(MastershipBasedTimestamp.class);
170
171 @Activate
172 public void activate() {
173 localNodeId = clusterService.getLocalNode().id();
174
175 deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
176 .withName("onos-device-descriptions")
177 .withSerializer(SERIALIZER_BUILDER)
178 .withTimestampProvider((k, v) -> {
179 try {
180 return deviceClockService.getTimestamp(k.deviceId());
181 } catch (IllegalStateException e) {
182 return null;
183 }
184 }).build();
185
186 portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
187 .withName("onos-port-descriptions")
188 .withSerializer(SERIALIZER_BUILDER)
189 .withTimestampProvider((k, v) -> {
190 try {
191 return deviceClockService.getTimestamp(k.deviceId());
192 } catch (IllegalStateException e) {
193 return null;
194 }
195 }).build();
196
197 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
198 .withName("onos-port-stats")
199 .withSerializer(SERIALIZER_BUILDER)
200 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
201 .withTimestampProvider((k, v) -> new WallClockTimestamp())
202 .withTombstonesDisabled()
203 .build();
204
205 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
206 eventuallyConsistentMapBuilder()
207 .withName("onos-port-stats-delta")
208 .withSerializer(SERIALIZER_BUILDER)
209 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
210 .withTimestampProvider((k, v) -> new WallClockTimestamp())
211 .withTombstonesDisabled()
212 .build();
213
214 clusterCommunicator.addSubscriber(DEVICE_INJECTED,
215 SERIALIZER::decode,
216 this::injectDevice,
217 SERIALIZER::encode,
218 SharedExecutors.getPoolThreadExecutor());
219
220 clusterCommunicator.addSubscriber(PORT_INJECTED,
221 SERIALIZER::decode,
222 this::injectPort,
223 SERIALIZER::encode,
224 SharedExecutors.getPoolThreadExecutor());
225
226 availableDevices = storageService.<DeviceId>setBuilder()
227 .withName("onos-online-devices")
228 .withSerializer(Serializer.using(KryoNamespaces.API))
Madan Jampanifc8aced2015-08-27 11:06:12 -0700229 .withRelaxedReadConsistency()
Madan Jampani538be742016-02-10 14:55:38 -0800230 .build()
231 .asDistributedSet();
Madan Jampanifc8aced2015-08-27 11:06:12 -0700232
233 deviceDescriptions.addListener(deviceUpdateListener);
234 portDescriptions.addListener(portUpdateListener);
235 devicePortStats.addListener(portStatsListener);
236 availableDevices.addListener(deviceStatusTracker);
237 log.info("Started");
238 }
239
240 @Deactivate
241 public void deactivate() {
242 devicePortStats.removeListener(portStatsListener);
243 deviceDescriptions.removeListener(deviceUpdateListener);
244 portDescriptions.removeListener(portUpdateListener);
245 availableDevices.removeListener(deviceStatusTracker);
246 devicePortStats.destroy();
247 devicePortDeltaStats.destroy();
248 deviceDescriptions.destroy();
249 portDescriptions.destroy();
250 devices.clear();
251 devicePorts.clear();
252 clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
253 clusterCommunicator.removeSubscriber(PORT_INJECTED);
254 log.info("Stopped");
255 }
256
257 @Override
258 public Iterable<Device> getDevices() {
259 return devices.values();
260 }
261
262 @Override
263 public int getDeviceCount() {
264 return devices.size();
265 }
266
267 @Override
268 public Device getDevice(DeviceId deviceId) {
269 return devices.get(deviceId);
270 }
271
helenyrwufd296b62016-06-22 17:43:02 -0700272 // FIXME handle deviceDescription.isDefaultAvailable()=false case properly.
Madan Jampanifc8aced2015-08-27 11:06:12 -0700273 @Override
274 public DeviceEvent createOrUpdateDevice(ProviderId providerId,
275 DeviceId deviceId,
276 DeviceDescription deviceDescription) {
277 NodeId master = mastershipService.getMasterFor(deviceId);
278 if (localNodeId.equals(master)) {
279 deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
280 return refreshDeviceCache(providerId, deviceId);
281 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800282 // Only forward for ConfigProvider
283 // Forwarding was added as a workaround for ONOS-490
Jon Halla3fcf672017-03-28 16:53:22 -0700284 if (!"cfg".equals(providerId.scheme())) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800285 return null;
286 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700287 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
288 return Futures.getUnchecked(
289 clusterCommunicator.sendAndReceive(deviceInjectedEvent,
290 DEVICE_INJECTED,
291 SERIALIZER::encode,
292 SERIALIZER::decode,
293 master));
294 }
295 }
296
297 private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
298 AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
299 Device device = devices.compute(deviceId, (k, existingDevice) -> {
300 Device newDevice = composeDevice(deviceId);
301 if (existingDevice == null) {
302 eventType.set(DEVICE_ADDED);
303 } else {
304 // We allow only certain attributes to trigger update
305 boolean propertiesChanged =
306 !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
307 !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
308 !Objects.equals(existingDevice.providerId(), newDevice.providerId());
309 boolean annotationsChanged =
310 !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
311
312 // Primary providers can respond to all changes, but ancillary ones
313 // should respond only to annotation changes.
314 if ((providerId.isAncillary() && annotationsChanged) ||
315 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
316 boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
317 verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
318 providerId, existingDevice, devices.get(deviceId), newDevice);
319 eventType.set(DEVICE_UPDATED);
320 }
321 }
322 return newDevice;
323 });
324 if (eventType.get() != null && !providerId.isAncillary()) {
325 markOnline(deviceId);
326 }
327 return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
328 }
329
330 /**
331 * Returns the primary providerId for a device.
332 * @param deviceId device identifier
333 * @return primary providerId
334 */
335 private Set<ProviderId> getAllProviders(DeviceId deviceId) {
336 return deviceDescriptions.keySet()
337 .stream()
338 .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
339 .map(deviceKey -> deviceKey.providerId())
340 .collect(Collectors.toSet());
341 }
342
343 /**
344 * Returns the identifier for all providers for a device.
345 * @param deviceId device identifier
346 * @return set of provider identifiers
347 */
348 private ProviderId getPrimaryProviderId(DeviceId deviceId) {
349 Set<ProviderId> allProviderIds = getAllProviders(deviceId);
350 return allProviderIds.stream()
351 .filter(p -> !p.isAncillary())
352 .findFirst()
353 .orElse(Iterables.getFirst(allProviderIds, null));
354 }
355
356 /**
357 * Returns a Device, merging descriptions from multiple Providers.
358 *
359 * @param deviceId device identifier
360 * @return Device instance
361 */
362 private Device composeDevice(DeviceId deviceId) {
363
364 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
365 DeviceDescription primaryDeviceDescription =
366 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
367
368 Type type = primaryDeviceDescription.type();
369 String manufacturer = primaryDeviceDescription.manufacturer();
370 String hwVersion = primaryDeviceDescription.hwVersion();
371 String swVersion = primaryDeviceDescription.swVersion();
372 String serialNumber = primaryDeviceDescription.serialNumber();
373 ChassisId chassisId = primaryDeviceDescription.chassisId();
374 DefaultAnnotations annotations = mergeAnnotations(deviceId);
375
376 return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
377 hwVersion, swVersion, serialNumber,
378 chassisId, annotations);
379 }
380
381 private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
382 Device removedDevice = devices.remove(deviceId);
383 if (removedDevice != null) {
384 getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
385 return new DeviceEvent(DEVICE_REMOVED, removedDevice);
386 }
387 return null;
388 }
389
helenyrwufd296b62016-06-22 17:43:02 -0700390 // FIXME publicization of markOnline -- trigger some action independently?
391 public boolean markOnline(DeviceId deviceId) {
392 if (devices.containsKey(deviceId)) {
393 return availableDevices.add(deviceId);
394 }
395 log.warn("Device {} does not exist in store", deviceId);
396 return false;
Madan Jampanifc8aced2015-08-27 11:06:12 -0700397 }
398
399 @Override
400 public DeviceEvent markOffline(DeviceId deviceId) {
401 availableDevices.remove(deviceId);
402 // set update listener will raise the event.
403 return null;
404 }
405
406 @Override
407 public List<DeviceEvent> updatePorts(ProviderId providerId,
408 DeviceId deviceId,
409 List<PortDescription> descriptions) {
410 NodeId master = mastershipService.getMasterFor(deviceId);
411 List<DeviceEvent> deviceEvents = null;
412 if (localNodeId.equals(master)) {
413 descriptions.forEach(description -> {
414 PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
415 portDescriptions.put(portKey, description);
416 });
417 deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
418 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800419 // Only forward for ConfigProvider
420 // Forwarding was added as a workaround for ONOS-490
Jon Halla3fcf672017-03-28 16:53:22 -0700421 if (!"cfg".equals(providerId.scheme())) {
HIGUCHI Yuta89461772016-01-26 12:18:10 -0800422 return Collections.emptyList();
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800423 }
Madan Jampanifc8aced2015-08-27 11:06:12 -0700424 if (master == null) {
425 return Collections.emptyList();
426 }
427 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
428 deviceEvents = Futures.getUnchecked(
429 clusterCommunicator.sendAndReceive(portInjectedEvent,
430 PORT_INJECTED,
431 SERIALIZER::encode,
432 SERIALIZER::decode,
433 master));
434 }
435 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
436 }
437
438 private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
439 DeviceId deviceId,
440 Optional<PortNumber> portNumber) {
441 Device device = devices.get(deviceId);
442 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
443 List<DeviceEvent> events = Lists.newArrayList();
444
445 Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
446 List<PortDescription> descriptions = Lists.newArrayList();
447 portDescriptions.entrySet().forEach(e -> {
448 PortKey key = e.getKey();
449 PortDescription value = e.getValue();
450 if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
451 if (portNumber.isPresent()) {
452 if (portNumber.get().equals(key.portNumber())) {
453 descriptions.add(value);
454 }
455 } else {
456 descriptions.add(value);
457 }
458 }
459 });
460
461 for (PortDescription description : descriptions) {
462 final PortNumber number = description.portNumber();
463 ports.compute(number, (k, existingPort) -> {
464 Port newPort = composePort(device, number);
465 if (existingPort == null) {
466 events.add(new DeviceEvent(PORT_ADDED, device, newPort));
467 } else {
468 if (existingPort.isEnabled() != newPort.isEnabled() ||
469 existingPort.type() != newPort.type() ||
470 existingPort.portSpeed() != newPort.portSpeed() ||
471 !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
472 events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
473 }
474 }
475 return newPort;
476 });
477 }
478
479 return events;
480 }
481
482 /**
483 * Returns a Port, merging descriptions from multiple Providers.
484 *
485 * @param device device the port is on
486 * @param number port number
487 * @return Port instance
488 */
489 private Port composePort(Device device, PortNumber number) {
490
491 Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
492 portDescriptions.entrySet().forEach(entry -> {
493 PortKey portKey = entry.getKey();
494 if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
495 descriptions.put(portKey.providerId(), entry.getValue());
496 }
497 });
498 ProviderId primary = getPrimaryProviderId(device.id());
499 PortDescription primaryDescription = descriptions.get(primary);
500
501 // if no primary, assume not enabled
502 boolean isEnabled = false;
503 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
504 if (primaryDescription != null) {
505 isEnabled = primaryDescription.isEnabled();
506 annotations = merge(annotations, primaryDescription.annotations());
507 }
508 Port updated = null;
509 for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
510 if (e.getKey().equals(primary)) {
511 continue;
512 }
513 annotations = merge(annotations, e.getValue().annotations());
514 updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
515 }
516 if (primaryDescription == null) {
517 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
518 }
519 return updated == null
520 ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
521 : updated;
522 }
523
524 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
525 PortDescription description, Annotations annotations) {
Madan Jampanifc8aced2015-08-27 11:06:12 -0700526 return new DefaultPort(device, number, isEnabled, description.type(),
527 description.portSpeed(), annotations);
Madan Jampanifc8aced2015-08-27 11:06:12 -0700528 }
529
530 @Override
531 public DeviceEvent updatePortStatus(ProviderId providerId,
532 DeviceId deviceId,
533 PortDescription portDescription) {
534 portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
535 List<DeviceEvent> events =
536 refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
537 return Iterables.getFirst(events, null);
538 }
539
540 @Override
541 public List<Port> getPorts(DeviceId deviceId) {
542 return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
543 }
544
545 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700546 public Stream<PortDescription> getPortDescriptions(ProviderId pid,
547 DeviceId deviceId) {
548
549 return portDescriptions.entrySet().stream()
550 .filter(e -> e.getKey().providerId().equals(pid))
551 .map(Map.Entry::getValue);
552 }
553
554 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700555 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
556 return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
557 }
558
559 @Override
Yuta HIGUCHI6eb00cc2016-06-10 11:55:12 -0700560 public PortDescription getPortDescription(ProviderId pid,
561 DeviceId deviceId,
562 PortNumber portNumber) {
563 return portDescriptions.get(new PortKey(pid, deviceId, portNumber));
564 }
565
566 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700567 public DeviceEvent updatePortStatistics(ProviderId providerId,
568 DeviceId deviceId,
569 Collection<PortStatistics> newStatsCollection) {
570
571 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
572 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
573 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
574
575 if (prvStatsMap != null) {
576 for (PortStatistics newStats : newStatsCollection) {
577 PortNumber port = PortNumber.portNumber(newStats.port());
578 PortStatistics prvStats = prvStatsMap.get(port);
579 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
580 PortStatistics deltaStats = builder.build();
581 if (prvStats != null) {
582 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
583 }
584 deltaStatsMap.put(port, deltaStats);
585 newStatsMap.put(port, newStats);
586 }
587 } else {
588 for (PortStatistics newStats : newStatsCollection) {
589 PortNumber port = PortNumber.portNumber(newStats.port());
590 newStatsMap.put(port, newStats);
591 }
592 }
593 devicePortDeltaStats.put(deviceId, deltaStatsMap);
594 devicePortStats.put(deviceId, newStatsMap);
595 // DeviceEvent returns null because of InternalPortStatsListener usage
596 return null;
597 }
598
599 /**
600 * Calculate delta statistics by subtracting previous from new statistics.
601 *
Madan Jampanif97edc12015-08-31 14:41:01 -0700602 * @param deviceId device indentifier
603 * @param prvStats previous port statistics
604 * @param newStats new port statistics
Madan Jampanifc8aced2015-08-27 11:06:12 -0700605 * @return PortStatistics
606 */
607 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
608 // calculate time difference
609 long deltaStatsSec, deltaStatsNano;
610 if (newStats.durationNano() < prvStats.durationNano()) {
611 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
612 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
613 } else {
614 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
615 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
616 }
617 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
618 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
619 .setPort(newStats.port())
620 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
621 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
622 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
623 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
624 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
625 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
626 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
627 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
628 .setDurationSec(deltaStatsSec)
629 .setDurationNano(deltaStatsNano)
630 .build();
631 return deltaStats;
632 }
633
634 @Override
635 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
636 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
637 if (portStats == null) {
638 return Collections.emptyList();
639 }
640 return ImmutableList.copyOf(portStats.values());
641 }
642
643 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530644 public PortStatistics getStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
645 Map<PortNumber, PortStatistics> portStatsMap = devicePortStats.get(deviceId);
646 if (portStatsMap == null) {
647 return null;
648 }
649 PortStatistics portStats = portStatsMap.get(portNumber);
650 return portStats;
651 }
652
653 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700654 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
655 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
656 if (portStats == null) {
657 return Collections.emptyList();
658 }
659 return ImmutableList.copyOf(portStats.values());
660 }
661
662 @Override
Viswanath KSP22774cd2016-08-20 20:06:30 +0530663 public PortStatistics getDeltaStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
664 Map<PortNumber, PortStatistics> portStatsMap = devicePortDeltaStats.get(deviceId);
665 if (portStatsMap == null) {
666 return null;
667 }
668 PortStatistics portStats = portStatsMap.get(portNumber);
669 return portStats;
670 }
671
672 @Override
Madan Jampanifc8aced2015-08-27 11:06:12 -0700673 public boolean isAvailable(DeviceId deviceId) {
674 return availableDevices.contains(deviceId);
675 }
676
677 @Override
678 public Iterable<Device> getAvailableDevices() {
679 return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
680 }
681
682 @Override
683 public DeviceEvent removeDevice(DeviceId deviceId) {
684 NodeId master = mastershipService.getMasterFor(deviceId);
685 // if there exist a master, forward
686 // if there is no master, try to become one and process
687 boolean relinquishAtEnd = false;
688 if (master == null) {
689 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
690 if (myRole != MastershipRole.NONE) {
691 relinquishAtEnd = true;
692 }
693 log.debug("Temporarily requesting role for {} to remove", deviceId);
694 MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
695 if (role == MastershipRole.MASTER) {
696 master = localNodeId;
697 }
698 }
699
700 if (!localNodeId.equals(master)) {
701 log.debug("{} has control of {}, forwarding remove request",
702 master, deviceId);
703
704 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
705 .whenComplete((r, e) -> {
706 if (e != null) {
707 log.error("Failed to forward {} remove request to its master", deviceId, e);
708 }
709 });
710 return null;
711 }
712
713 // I have control..
714 DeviceEvent event = null;
715 final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
716 DeviceDescription removedDeviceDescription =
717 deviceDescriptions.remove(deviceKey);
718 if (removedDeviceDescription != null) {
719 event = purgeDeviceCache(deviceId);
720 }
721
722 if (relinquishAtEnd) {
723 log.debug("Relinquishing temporary role acquired for {}", deviceId);
724 mastershipService.relinquishMastership(deviceId);
725 }
726 return event;
727 }
728
729 private DeviceEvent injectDevice(DeviceInjectedEvent event) {
730 return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
731 }
732
733 private List<DeviceEvent> injectPort(PortInjectedEvent event) {
734 return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
735 }
736
737 private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
738 ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
739 DeviceDescription primaryDeviceDescription =
740 deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
741 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
742 annotations = merge(annotations, primaryDeviceDescription.annotations());
743 for (ProviderId providerId : getAllProviders(deviceId)) {
744 if (!providerId.equals(primaryProviderId)) {
745 annotations = merge(annotations,
746 deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
747 }
748 }
749 return annotations;
750 }
751
752 private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
753 @Override
754 public void event(SetEvent<DeviceId> event) {
755 final DeviceId deviceId = event.entry();
756 final Device device = devices.get(deviceId);
757 if (device != null) {
758 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
759 } else {
760 pendingAvailableChangeUpdates.add(deviceId);
761 }
762 }
763 }
764
765 private class InternalDeviceChangeEventListener
766 implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
767 @Override
768 public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
769 DeviceId deviceId = event.key().deviceId();
770 ProviderId providerId = event.key().providerId();
771 if (event.type() == PUT) {
772 notifyDelegate(refreshDeviceCache(providerId, deviceId));
773 if (pendingAvailableChangeUpdates.remove(deviceId)) {
774 notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
775 }
776 } else if (event.type() == REMOVE) {
777 notifyDelegate(purgeDeviceCache(deviceId));
778 }
779 }
780 }
781
782 private class InternalPortChangeEventListener
783 implements EventuallyConsistentMapListener<PortKey, PortDescription> {
784 @Override
785 public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
786 DeviceId deviceId = event.key().deviceId();
787 ProviderId providerId = event.key().providerId();
788 PortNumber portNumber = event.key().portNumber();
789 if (event.type() == PUT) {
790 if (devices.containsKey(deviceId)) {
791 List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
792 for (DeviceEvent deviceEvent : events) {
793 notifyDelegate(deviceEvent);
794 }
795 }
796 } else if (event.type() == REMOVE) {
797 log.warn("Unexpected port removed event");
798 }
799 }
800 }
801
802 private class InternalPortStatsListener
803 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
804 @Override
805 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
806 if (event.type() == PUT) {
807 Device device = devices.get(event.key());
808 if (device != null) {
Thomas Vachuskad4955ae2016-08-23 14:56:37 -0700809 notifyDelegate(new DeviceEvent(PORT_STATS_UPDATED, device));
Madan Jampanifc8aced2015-08-27 11:06:12 -0700810 }
811 }
812 }
813 }
814}