blob: 680f0a097a74896b2be9e40db67254a907ce3d45 [file] [log] [blame]
/*
* Copyright 2015-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.device.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.ChassisId;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipTermService;
import org.onosproject.net.Annotations;
import org.onosproject.net.AnnotationsUtil;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.DefaultPort;
import org.onosproject.net.Device;
import org.onosproject.net.Device.Type;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultPortStatistics;
import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceStore;
import org.onosproject.net.device.DeviceStoreDelegate;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.SetEventListener;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_STATS_UPDATED;
import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages the inventory of devices using a {@code EventuallyConsistentMap}.
*/
//@Component(immediate = true, enabled = false)
@Service
public class ECDeviceStore
extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
implements DeviceStore {
private final Logger log = getLogger(getClass());
private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
private DistributedSet<DeviceId> availableDevices;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipTermService mastershipTermService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceClockService deviceClockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private NodeId localNodeId;
private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
new InternalDeviceChangeEventListener();
private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
new InternalPortChangeEventListener();
private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
new InternalPortStatsListener();
private final SetEventListener<DeviceId> deviceStatusTracker =
new InternalDeviceStatusTracker();
protected static final Serializer SERIALIZER = Serializer.using(
KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.build("ECDevice"));
protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(DeviceKey.class)
.register(PortKey.class)
.register(DeviceKey.class)
.register(PortKey.class)
.register(MastershipBasedTimestamp.class);
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
.withName("onos-device-descriptions")
.withSerializer(SERIALIZER_BUILDER)
.withTimestampProvider((k, v) -> {
try {
return deviceClockService.getTimestamp(k.deviceId());
} catch (IllegalStateException e) {
return null;
}
}).build();
portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
.withName("onos-port-descriptions")
.withSerializer(SERIALIZER_BUILDER)
.withTimestampProvider((k, v) -> {
try {
return deviceClockService.getTimestamp(k.deviceId());
} catch (IllegalStateException e) {
return null;
}
}).build();
devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
.withName("onos-port-stats")
.withSerializer(SERIALIZER_BUILDER)
.withAntiEntropyPeriod(5, TimeUnit.SECONDS)
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.withTombstonesDisabled()
.build();
devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
eventuallyConsistentMapBuilder()
.withName("onos-port-stats-delta")
.withSerializer(SERIALIZER_BUILDER)
.withAntiEntropyPeriod(5, TimeUnit.SECONDS)
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.withTombstonesDisabled()
.build();
availableDevices = storageService.<DeviceId>setBuilder()
.withName("onos-online-devices")
.withSerializer(Serializer.using(KryoNamespaces.API))
.withRelaxedReadConsistency()
.build()
.asDistributedSet();
deviceDescriptions.addListener(deviceUpdateListener);
portDescriptions.addListener(portUpdateListener);
devicePortStats.addListener(portStatsListener);
availableDevices.addListener(deviceStatusTracker);
log.info("Started");
}
@Deactivate
public void deactivate() {
devicePortStats.removeListener(portStatsListener);
deviceDescriptions.removeListener(deviceUpdateListener);
portDescriptions.removeListener(portUpdateListener);
availableDevices.removeListener(deviceStatusTracker);
devicePortStats.destroy();
devicePortDeltaStats.destroy();
deviceDescriptions.destroy();
portDescriptions.destroy();
devices.clear();
devicePorts.clear();
log.info("Stopped");
}
@Override
public Iterable<Device> getDevices() {
return devices.values();
}
@Override
public int getDeviceCount() {
return devices.size();
}
@Override
public int getAvailableDeviceCount() {
return availableDevices.size();
}
@Override
public Device getDevice(DeviceId deviceId) {
return devices.get(deviceId);
}
// FIXME handle deviceDescription.isDefaultAvailable()=false case properly.
@Override
public DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) {
NodeId master = mastershipService.getMasterFor(deviceId);
if (localNodeId.equals(master)) {
deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
return refreshDeviceCache(providerId, deviceId);
} else {
return null;
}
}
private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
Device device = devices.compute(deviceId, (k, existingDevice) -> {
Device newDevice = composeDevice(deviceId);
if (existingDevice == null) {
eventType.set(DEVICE_ADDED);
} else {
// We allow only certain attributes to trigger update
boolean propertiesChanged =
!Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
!Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
!Objects.equals(existingDevice.providerId(), newDevice.providerId());
boolean annotationsChanged =
!AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
// Primary providers can respond to all changes, but ancillary ones
// should respond only to annotation changes.
if ((providerId.isAncillary() && annotationsChanged) ||
(!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
providerId, existingDevice, devices.get(deviceId), newDevice);
eventType.set(DEVICE_UPDATED);
}
}
return newDevice;
});
if (eventType.get() != null && !providerId.isAncillary()) {
markOnline(deviceId);
}
return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
}
/**
* Returns the primary providerId for a device.
* @param deviceId device identifier
* @return primary providerId
*/
private Set<ProviderId> getAllProviders(DeviceId deviceId) {
return deviceDescriptions.keySet()
.stream()
.filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
.map(deviceKey -> deviceKey.providerId())
.collect(Collectors.toSet());
}
/**
* Returns the identifier for all providers for a device.
* @param deviceId device identifier
* @return set of provider identifiers
*/
private ProviderId getPrimaryProviderId(DeviceId deviceId) {
Set<ProviderId> allProviderIds = getAllProviders(deviceId);
return allProviderIds.stream()
.filter(p -> !p.isAncillary())
.findFirst()
.orElse(Iterables.getFirst(allProviderIds, null));
}
/**
* Returns a Device, merging descriptions from multiple Providers.
*
* @param deviceId device identifier
* @return Device instance
*/
private Device composeDevice(DeviceId deviceId) {
ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
DeviceDescription primaryDeviceDescription =
deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
Type type = primaryDeviceDescription.type();
String manufacturer = primaryDeviceDescription.manufacturer();
String hwVersion = primaryDeviceDescription.hwVersion();
String swVersion = primaryDeviceDescription.swVersion();
String serialNumber = primaryDeviceDescription.serialNumber();
ChassisId chassisId = primaryDeviceDescription.chassisId();
DefaultAnnotations annotations = mergeAnnotations(deviceId);
return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
hwVersion, swVersion, serialNumber,
chassisId, annotations);
}
private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
Device removedDevice = devices.remove(deviceId);
if (removedDevice != null) {
getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
return new DeviceEvent(DEVICE_REMOVED, removedDevice);
}
return null;
}
// FIXME publicization of markOnline -- trigger some action independently?
public boolean markOnline(DeviceId deviceId) {
if (devices.containsKey(deviceId)) {
return availableDevices.add(deviceId);
}
log.warn("Device {} does not exist in store", deviceId);
return false;
}
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
availableDevices.remove(deviceId);
// set update listener will raise the event.
return null;
}
@Override
public List<DeviceEvent> updatePorts(ProviderId providerId,
DeviceId deviceId,
List<PortDescription> descriptions) {
NodeId master = mastershipService.getMasterFor(deviceId);
List<DeviceEvent> deviceEvents = null;
if (localNodeId.equals(master)) {
descriptions.forEach(description -> {
PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
portDescriptions.put(portKey, description);
});
deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
} else {
return Collections.emptyList();
}
return deviceEvents == null ? Collections.emptyList() : deviceEvents;
}
private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
DeviceId deviceId,
Optional<PortNumber> portNumber) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
List<DeviceEvent> events = Lists.newArrayList();
Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
List<PortDescription> descriptions = Lists.newArrayList();
portDescriptions.entrySet().forEach(e -> {
PortKey key = e.getKey();
PortDescription value = e.getValue();
if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
if (portNumber.isPresent()) {
if (portNumber.get().equals(key.portNumber())) {
descriptions.add(value);
}
} else {
descriptions.add(value);
}
}
});
for (PortDescription description : descriptions) {
final PortNumber number = description.portNumber();
ports.compute(number, (k, existingPort) -> {
Port newPort = composePort(device, number);
if (existingPort == null) {
events.add(new DeviceEvent(PORT_ADDED, device, newPort));
} else {
if (existingPort.isEnabled() != newPort.isEnabled() ||
existingPort.type() != newPort.type() ||
existingPort.portSpeed() != newPort.portSpeed() ||
!AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
}
}
return newPort;
});
}
return events;
}
/**
* Returns a Port, merging descriptions from multiple Providers.
*
* @param device device the port is on
* @param number port number
* @return Port instance
*/
private Port composePort(Device device, PortNumber number) {
Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
portDescriptions.entrySet().forEach(entry -> {
PortKey portKey = entry.getKey();
if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
descriptions.put(portKey.providerId(), entry.getValue());
}
});
ProviderId primary = getPrimaryProviderId(device.id());
PortDescription primaryDescription = descriptions.get(primary);
// if no primary, assume not enabled
boolean isEnabled = false;
DefaultAnnotations annotations = DefaultAnnotations.builder().build();
if (primaryDescription != null) {
isEnabled = primaryDescription.isEnabled();
annotations = merge(annotations, primaryDescription.annotations());
}
Port updated = null;
for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
if (e.getKey().equals(primary)) {
continue;
}
annotations = merge(annotations, e.getValue().annotations());
updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
}
if (primaryDescription == null) {
return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
}
return updated == null
? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
: updated;
}
private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
PortDescription description, Annotations annotations) {
return new DefaultPort(device, number, isEnabled, description.type(),
description.portSpeed(), annotations);
}
@Override
public DeviceEvent updatePortStatus(ProviderId providerId,
DeviceId deviceId,
PortDescription portDescription) {
portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
List<DeviceEvent> events =
refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
return Iterables.getFirst(events, null);
}
@Override
public List<Port> getPorts(DeviceId deviceId) {
return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
}
@Override
public Stream<PortDescription> getPortDescriptions(ProviderId pid,
DeviceId deviceId) {
return portDescriptions.entrySet().stream()
.filter(e -> e.getKey().providerId().equals(pid))
.map(Map.Entry::getValue);
}
@Override
public Port getPort(DeviceId deviceId, PortNumber portNumber) {
return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
}
@Override
public PortDescription getPortDescription(ProviderId pid,
DeviceId deviceId,
PortNumber portNumber) {
return portDescriptions.get(new PortKey(pid, deviceId, portNumber));
}
@Override
public DeviceEvent updatePortStatistics(ProviderId providerId,
DeviceId deviceId,
Collection<PortStatistics> newStatsCollection) {
Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
if (prvStatsMap != null) {
for (PortStatistics newStats : newStatsCollection) {
PortNumber port = PortNumber.portNumber(newStats.port());
PortStatistics prvStats = prvStatsMap.get(port);
DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
PortStatistics deltaStats = builder.build();
if (prvStats != null) {
deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
}
deltaStatsMap.put(port, deltaStats);
newStatsMap.put(port, newStats);
}
} else {
for (PortStatistics newStats : newStatsCollection) {
PortNumber port = PortNumber.portNumber(newStats.port());
newStatsMap.put(port, newStats);
}
}
devicePortDeltaStats.put(deviceId, deltaStatsMap);
devicePortStats.put(deviceId, newStatsMap);
// DeviceEvent returns null because of InternalPortStatsListener usage
return null;
}
/**
* Calculate delta statistics by subtracting previous from new statistics.
*
* @param deviceId device indentifier
* @param prvStats previous port statistics
* @param newStats new port statistics
* @return PortStatistics
*/
public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
// calculate time difference
long deltaStatsSec, deltaStatsNano;
if (newStats.durationNano() < prvStats.durationNano()) {
deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
} else {
deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
}
DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
.setPort(newStats.port())
.setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
.setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
.setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
.setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
.setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
.setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
.setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
.setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
.setDurationSec(deltaStatsSec)
.setDurationNano(deltaStatsNano)
.build();
return deltaStats;
}
@Override
public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
if (portStats == null) {
return Collections.emptyList();
}
return ImmutableList.copyOf(portStats.values());
}
@Override
public PortStatistics getStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
Map<PortNumber, PortStatistics> portStatsMap = devicePortStats.get(deviceId);
if (portStatsMap == null) {
return null;
}
PortStatistics portStats = portStatsMap.get(portNumber);
return portStats;
}
@Override
public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
if (portStats == null) {
return Collections.emptyList();
}
return ImmutableList.copyOf(portStats.values());
}
@Override
public PortStatistics getDeltaStatisticsForPort(DeviceId deviceId, PortNumber portNumber) {
Map<PortNumber, PortStatistics> portStatsMap = devicePortDeltaStats.get(deviceId);
if (portStatsMap == null) {
return null;
}
PortStatistics portStats = portStatsMap.get(portNumber);
return portStats;
}
@Override
public boolean isAvailable(DeviceId deviceId) {
return availableDevices.contains(deviceId);
}
@Override
public Iterable<Device> getAvailableDevices() {
return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
}
@Override
public DeviceEvent removeDevice(DeviceId deviceId) {
NodeId master = mastershipService.getMasterFor(deviceId);
// if there exist a master, forward
// if there is no master, try to become one and process
boolean relinquishAtEnd = false;
if (master == null) {
final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
if (myRole != MastershipRole.NONE) {
relinquishAtEnd = true;
}
log.debug("Temporarily requesting role for {} to remove", deviceId);
MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
if (role == MastershipRole.MASTER) {
master = localNodeId;
}
}
if (!localNodeId.equals(master)) {
log.debug("{} has control of {}, forwarding remove request",
master, deviceId);
clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
.whenComplete((r, e) -> {
if (e != null) {
log.error("Failed to forward {} remove request to its master", deviceId, e);
}
});
return null;
}
// I have control..
DeviceEvent event = null;
final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
DeviceDescription removedDeviceDescription =
deviceDescriptions.remove(deviceKey);
if (removedDeviceDescription != null) {
event = purgeDeviceCache(deviceId);
}
if (relinquishAtEnd) {
log.debug("Relinquishing temporary role acquired for {}", deviceId);
mastershipService.relinquishMastership(deviceId);
}
return event;
}
private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
DeviceDescription primaryDeviceDescription =
deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
DefaultAnnotations annotations = DefaultAnnotations.builder().build();
annotations = merge(annotations, primaryDeviceDescription.annotations());
for (ProviderId providerId : getAllProviders(deviceId)) {
if (!providerId.equals(primaryProviderId)) {
annotations = merge(annotations,
deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
}
}
return annotations;
}
private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
@Override
public void event(SetEvent<DeviceId> event) {
final DeviceId deviceId = event.entry();
final Device device = devices.get(deviceId);
if (device != null) {
notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
} else {
pendingAvailableChangeUpdates.add(deviceId);
}
}
}
private class InternalDeviceChangeEventListener
implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
@Override
public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
DeviceId deviceId = event.key().deviceId();
ProviderId providerId = event.key().providerId();
if (event.type() == PUT) {
notifyDelegate(refreshDeviceCache(providerId, deviceId));
if (pendingAvailableChangeUpdates.remove(deviceId)) {
notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
}
} else if (event.type() == REMOVE) {
notifyDelegate(purgeDeviceCache(deviceId));
}
}
}
private class InternalPortChangeEventListener
implements EventuallyConsistentMapListener<PortKey, PortDescription> {
@Override
public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
DeviceId deviceId = event.key().deviceId();
ProviderId providerId = event.key().providerId();
PortNumber portNumber = event.key().portNumber();
if (event.type() == PUT) {
if (devices.containsKey(deviceId)) {
List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
for (DeviceEvent deviceEvent : events) {
notifyDelegate(deviceEvent);
}
}
} else if (event.type() == REMOVE) {
log.warn("Unexpected port removed event");
}
}
}
private class InternalPortStatsListener
implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
@Override
public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
if (event.type() == PUT) {
Device device = devices.get(event.key());
if (device != null) {
notifyDelegate(new DeviceEvent(PORT_STATS_UPDATED, device));
}
}
}
}
}