blob: 5449277c8adb24380b102d21657382e1aa2ca733 [file] [log] [blame]
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.device.impl;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Device.Type;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceClockService;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceStore;
import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.packet.ChassisId;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull;
import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static com.google.common.base.Verify.verify;
import static org.onlab.util.Tools.namedThreads;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
// TODO: give me a better name
/**
* Manages inventory of infrastructure devices using gossip protocol to distribute
* information.
*/
@Component(immediate = true)
@Service
public class GossipDeviceStore
extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
implements DeviceStore {
private final Logger log = getLogger(getClass());
private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
// innerMap is used to lock a Device, thus instance should never be replaced.
// collection of Description given from various providers
private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
deviceDescs = Maps.newConcurrentMap();
// cache of Device and Ports generated by compositing descriptions from providers
private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
// to be updated under Device lock
private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
// available(=UP) devices
private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceClockService deviceClockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.COMMON)
.register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
.register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
.register(InternalDeviceRemovedEvent.class)
.register(InternalPortEvent.class, new InternalPortEventSerializer())
.register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
.register(DeviceAntiEntropyAdvertisement.class)
.register(DeviceFragmentId.class)
.register(PortFragmentId.class)
.build()
.populate(1);
}
};
private ScheduledExecutorService executor;
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
executor =
newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
// TODO: Make these configurable
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
executor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() {
executor.shutdownNow();
try {
boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
if (timedout) {
log.error("Timeout during executor shutdown");
}
} catch (InterruptedException e) {
log.error("Error during executor shutdown", e);
}
deviceDescs.clear();
devices.clear();
devicePorts.clear();
availableDevices.clear();
log.info("Stopped");
}
@Override
public int getDeviceCount() {
return devices.size();
}
@Override
public Iterable<Device> getDevices() {
return Collections.unmodifiableCollection(devices.values());
}
@Override
public Device getDevice(DeviceId deviceId) {
return devices.get(deviceId);
}
@Override
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) {
final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
final DeviceEvent event;
final Timestamped<DeviceDescription> mergedDesc;
synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc();
}
if (event != null) {
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a device update topology event for providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
return event;
}
private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
DeviceId deviceId,
Timestamped<DeviceDescription> deltaDesc) {
// Collection of DeviceDescriptions for a Device
Map<ProviderId, DeviceDescriptions> providerDescs
= getOrCreateDeviceDescriptionsMap(deviceId);
synchronized (providerDescs) {
// locking per device
if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
log.debug("Ignoring outdated event: {}", deltaDesc);
return null;
}
DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc);
final Device oldDevice = devices.get(deviceId);
final Device newDevice;
if (deltaDesc == descs.getDeviceDesc() ||
deltaDesc.isNewer(descs.getDeviceDesc())) {
// on new device or valid update
descs.putDeviceDesc(deltaDesc);
newDevice = composeDevice(deviceId, providerDescs);
} else {
// outdated event, ignored.
return null;
}
if (oldDevice == null) {
// ADD
return createDevice(providerId, newDevice, deltaDesc.timestamp());
} else {
// UPDATE or ignore (no change or stale)
return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
}
}
}
// Creates the device and returns the appropriate event if necessary.
// Guarded by deviceDescs value (=Device lock)
private DeviceEvent createDevice(ProviderId providerId,
Device newDevice, Timestamp timestamp) {
// update composed device cache
Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
verify(oldDevice == null,
"Unexpected Device in cache. PID:%s [old=%s, new=%s]",
providerId, oldDevice, newDevice);
if (!providerId.isAncillary()) {
markOnline(newDevice.id(), timestamp);
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
}
// Updates the device and returns the appropriate event if necessary.
// Guarded by deviceDescs value (=Device lock)
private DeviceEvent updateDevice(ProviderId providerId,
Device oldDevice,
Device newDevice, Timestamp newTimestamp) {
// We allow only certain attributes to trigger update
boolean propertiesChanged =
!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
!Objects.equals(oldDevice.swVersion(), newDevice.swVersion());
boolean annotationsChanged =
!AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
// Primary providers can respond to all changes, but ancillary ones
// should respond only to annotation changes.
if ((providerId.isAncillary() && annotationsChanged) ||
(!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
if (!replaced) {
verify(replaced,
"Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
providerId, oldDevice, devices.get(newDevice.id())
, newDevice);
}
if (!providerId.isAncillary()) {
markOnline(newDevice.id(), newTimestamp);
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
}
// Otherwise merely attempt to change availability if primary provider
if (!providerId.isAncillary()) {
boolean added = markOnline(newDevice.id(), newTimestamp);
return !added ? null :
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
}
return null;
}
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
if (event != null) {
log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
deviceId, timestamp);
try {
notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
} catch (IOException e) {
log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
deviceId);
}
}
return event;
}
private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
Map<ProviderId, DeviceDescriptions> providerDescs
= getOrCreateDeviceDescriptionsMap(deviceId);
// locking device
synchronized (providerDescs) {
// accept off-line if given timestamp is newer than
// the latest Timestamp from Primary provider
DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
Timestamp lastTimestamp = primDescs.getLatestTimestamp();
if (timestamp.compareTo(lastTimestamp) <= 0) {
// outdated event ignore
return null;
}
offline.put(deviceId, timestamp);
Device device = devices.get(deviceId);
if (device == null) {
return null;
}
boolean removed = availableDevices.remove(deviceId);
if (removed) {
// TODO: broadcast ... DOWN only?
return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
}
return null;
}
}
/**
* Marks the device as available if the given timestamp is not outdated,
* compared to the time the device has been marked offline.
*
* @param deviceId identifier of the device
* @param timestamp of the event triggering this change.
* @return true if availability change request was accepted and changed the state
*/
// Guarded by deviceDescs value (=Device lock)
private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
// accept on-line if given timestamp is newer than
// the latest offline request Timestamp
Timestamp offlineTimestamp = offline.get(deviceId);
if (offlineTimestamp == null ||
offlineTimestamp.compareTo(timestamp) < 0) {
offline.remove(deviceId);
return availableDevices.add(deviceId);
}
return false;
}
@Override
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
DeviceId deviceId,
List<PortDescription> portDescriptions) {
final Timestamp newTimestamp;
try {
newTimestamp = deviceClockService.getTimestamp(deviceId);
} catch (IllegalStateException e) {
log.info("Timestamp was not available for device {}", deviceId);
log.debug(" discarding {}", portDescriptions);
// Failed to generate timestamp.
// Possible situation:
// Device connected and became master for short period of time,
// but lost mastership before this instance had the chance to
// retrieve term information.
// Information dropped here is expected to be recoverable by
// device probing after mastership change
return Collections.emptyList();
}
log.info("timestamp for {} {}", deviceId, newTimestamp);
final Timestamped<List<PortDescription>> timestampedInput
= new Timestamped<>(portDescriptions, newTimestamp);
final List<DeviceEvent> events;
final Timestamped<List<PortDescription>> merged;
synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
events = updatePortsInternal(providerId, deviceId, timestampedInput);
final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId);
List<PortDescription> mergedList =
FluentIterable.from(portDescriptions)
.transform(new Function<PortDescription, PortDescription>() {
@Override
public PortDescription apply(PortDescription input) {
// lookup merged port description
return descs.getPortDesc(input.portNumber()).value();
}
}).toList();
merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
}
if (!events.isEmpty()) {
log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
} catch (IOException e) {
log.error("Failed to notify peers of a port update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
return events;
}
private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
DeviceId deviceId,
Timestamped<List<PortDescription>> portDescriptions) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
List<DeviceEvent> events = new ArrayList<>();
synchronized (descsMap) {
if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
log.debug("Ignoring outdated events: {}", portDescriptions);
return null;
}
DeviceDescriptions descs = descsMap.get(providerId);
// every provider must provide DeviceDescription.
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
Map<PortNumber, Port> ports = getPortMap(deviceId);
final Timestamp newTimestamp = portDescriptions.timestamp();
// Add new ports
Set<PortNumber> processed = new HashSet<>();
for (PortDescription portDescription : portDescriptions.value()) {
final PortNumber number = portDescription.portNumber();
processed.add(number);
final Port oldPort = ports.get(number);
final Port newPort;
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null ||
newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
// on new port or valid update
// update description
descs.putPortDesc(new Timestamped<>(portDescription,
portDescriptions.timestamp()));
newPort = composePort(device, number, descsMap);
} else {
// outdated event, ignored.
continue;
}
events.add(oldPort == null ?
createPort(device, newPort, ports) :
updatePort(device, oldPort, newPort, ports));
}
events.addAll(pruneOldPorts(device, ports, processed));
}
return FluentIterable.from(events).filter(notNull()).toList();
}
// Creates a new port based on the port description adds it to the map and
// Returns corresponding event.
// Guarded by deviceDescs value (=Device lock)
private DeviceEvent createPort(Device device, Port newPort,
Map<PortNumber, Port> ports) {
ports.put(newPort.number(), newPort);
return new DeviceEvent(PORT_ADDED, device, newPort);
}
// Checks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event.
// Guarded by deviceDescs value (=Device lock)
private DeviceEvent updatePort(Device device, Port oldPort,
Port newPort,
Map<PortNumber, Port> ports) {
if (oldPort.isEnabled() != newPort.isEnabled() ||
oldPort.type() != newPort.type() ||
oldPort.portSpeed() != newPort.portSpeed() ||
!AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
ports.put(oldPort.number(), newPort);
return new DeviceEvent(PORT_UPDATED, device, newPort);
}
return null;
}
// Prunes the specified list of ports based on which ports are in the
// processed list and returns list of corresponding events.
// Guarded by deviceDescs value (=Device lock)
private List<DeviceEvent> pruneOldPorts(Device device,
Map<PortNumber, Port> ports,
Set<PortNumber> processed) {
List<DeviceEvent> events = new ArrayList<>();
Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
while (iterator.hasNext()) {
Entry<PortNumber, Port> e = iterator.next();
PortNumber portNumber = e.getKey();
if (!processed.contains(portNumber)) {
events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
iterator.remove();
}
}
return events;
}
// Gets the map of ports for the specified device; if one does not already
// exist, it creates and registers a new one.
private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
return createIfAbsentUnchecked(devicePorts, deviceId,
NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
}
private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
DeviceId deviceId) {
Map<ProviderId, DeviceDescriptions> r;
r = deviceDescs.get(deviceId);
if (r == null) {
r = new HashMap<ProviderId, DeviceDescriptions>();
final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
if (concurrentlyAdded != null) {
r = concurrentlyAdded;
}
}
return r;
}
// Guarded by deviceDescs value (=Device lock)
private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
Map<ProviderId, DeviceDescriptions> device,
ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
synchronized (device) {
DeviceDescriptions r = device.get(providerId);
if (r == null) {
r = new DeviceDescriptions(deltaDesc);
device.put(providerId, r);
}
return r;
}
}
@Override
public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
DeviceId deviceId,
PortDescription portDescription) {
final Timestamp newTimestamp;
try {
newTimestamp = deviceClockService.getTimestamp(deviceId);
} catch (IllegalStateException e) {
log.info("Timestamp was not available for device {}", deviceId);
log.debug(" discarding {}", portDescription);
// Failed to generate timestamp. Ignoring.
// See updatePorts comment
return null;
}
final Timestamped<PortDescription> deltaDesc
= new Timestamped<>(portDescription, newTimestamp);
final DeviceEvent event;
final Timestamped<PortDescription> mergedDesc;
synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId)
.getPortDesc(portDescription.portNumber());
}
if (event != null) {
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a port status update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
return event;
}
private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
Timestamped<PortDescription> deltaDesc) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
synchronized (descsMap) {
if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
log.debug("Ignoring outdated event: {}", deltaDesc);
return null;
}
DeviceDescriptions descs = descsMap.get(providerId);
// assuming all providers must to give DeviceDescription
verify(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
final PortNumber number = deltaDesc.value().portNumber();
final Port oldPort = ports.get(number);
final Port newPort;
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null ||
deltaDesc.isNewer(existingPortDesc)) {
// on new port or valid update
// update description
descs.putPortDesc(deltaDesc);
newPort = composePort(device, number, descsMap);
} else {
// same or outdated event, ignored.
log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
return null;
}
if (oldPort == null) {
return createPort(device, newPort, ports);
} else {
return updatePort(device, oldPort, newPort, ports);
}
}
}
@Override
public List<Port> getPorts(DeviceId deviceId) {
Map<PortNumber, Port> ports = devicePorts.get(deviceId);
if (ports == null) {
return Collections.emptyList();
}
return ImmutableList.copyOf(ports.values());
}
@Override
public Port getPort(DeviceId deviceId, PortNumber portNumber) {
Map<PortNumber, Port> ports = devicePorts.get(deviceId);
return ports == null ? null : ports.get(portNumber);
}
@Override
public boolean isAvailable(DeviceId deviceId) {
return availableDevices.contains(deviceId);
}
@Override
public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
final NodeId master = mastershipService.getMasterFor(deviceId);
if (!clusterService.getLocalNode().id().equals(master)) {
log.info("Removal of device {} requested on non master node", deviceId);
// FIXME silently ignoring. Should be forwarding or broadcasting to
// master.
return null;
}
Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
if (event != null) {
log.info("Notifying peers of a device removed topology event for deviceId: {}",
deviceId);
try {
notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
} catch (IOException e) {
log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
deviceId);
}
}
return event;
}
private DeviceEvent removeDeviceInternal(DeviceId deviceId,
Timestamp timestamp) {
Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
synchronized (descs) {
// accept removal request if given timestamp is newer than
// the latest Timestamp from Primary provider
DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
Timestamp lastTimestamp = primDescs.getLatestTimestamp();
if (timestamp.compareTo(lastTimestamp) <= 0) {
// outdated event ignore
return null;
}
removalRequest.put(deviceId, timestamp);
Device device = devices.remove(deviceId);
// should DEVICE_REMOVED carry removed ports?
Map<PortNumber, Port> ports = devicePorts.get(deviceId);
if (ports != null) {
ports.clear();
}
markOfflineInternal(deviceId, timestamp);
descs.clear();
return device == null ? null :
new DeviceEvent(DEVICE_REMOVED, device, null);
}
}
/**
* Checks if given timestamp is superseded by removal request
* with more recent timestamp.
*
* @param deviceId identifier of a device
* @param timestampToCheck timestamp of an event to check
* @return true if device is already removed
*/
private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
Timestamp removalTimestamp = removalRequest.get(deviceId);
if (removalTimestamp != null &&
removalTimestamp.compareTo(timestampToCheck) >= 0) {
// removalRequest is more recent
return true;
}
return false;
}
/**
* Returns a Device, merging description given from multiple Providers.
*
* @param deviceId device identifier
* @param providerDescs Collection of Descriptions from multiple providers
* @return Device instance
*/
private Device composeDevice(DeviceId deviceId,
Map<ProviderId, DeviceDescriptions> providerDescs) {
checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
ProviderId primary = pickPrimaryPID(providerDescs);
DeviceDescriptions desc = providerDescs.get(primary);
final DeviceDescription base = desc.getDeviceDesc().value();
Type type = base.type();
String manufacturer = base.manufacturer();
String hwVersion = base.hwVersion();
String swVersion = base.swVersion();
String serialNumber = base.serialNumber();
ChassisId chassisId = base.chassisId();
DefaultAnnotations annotations = DefaultAnnotations.builder().build();
annotations = merge(annotations, base.annotations());
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (e.getKey().equals(primary)) {
continue;
}
// TODO: should keep track of Description timestamp
// and only merge conflicting keys when timestamp is newer.
// Currently assuming there will never be a key conflict between
// providers
// annotation merging. not so efficient, should revisit later
annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
}
return new DefaultDevice(primary, deviceId , type, manufacturer,
hwVersion, swVersion, serialNumber,
chassisId, annotations);
}
/**
* Returns a Port, merging description given from multiple Providers.
*
* @param device device the port is on
* @param number port number
* @param descsMap Collection of Descriptions from multiple providers
* @return Port instance
*/
private Port composePort(Device device, PortNumber number,
Map<ProviderId, DeviceDescriptions> descsMap) {
ProviderId primary = pickPrimaryPID(descsMap);
DeviceDescriptions primDescs = descsMap.get(primary);
// if no primary, assume not enabled
// TODO: revisit this default port enabled/disabled behavior
boolean isEnabled = false;
DefaultAnnotations annotations = DefaultAnnotations.builder().build();
final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
if (portDesc != null) {
isEnabled = portDesc.value().isEnabled();
annotations = merge(annotations, portDesc.value().annotations());
}
for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
if (e.getKey().equals(primary)) {
continue;
}
// TODO: should keep track of Description timestamp
// and only merge conflicting keys when timestamp is newer.
// Currently assuming there will never be a key conflict between
// providers
// annotation merging. not so efficient, should revisit later
final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
if (otherPortDesc != null) {
annotations = merge(annotations, otherPortDesc.value().annotations());
}
}
return portDesc == null ?
new DefaultPort(device, number, false, annotations) :
new DefaultPort(device, number, isEnabled, portDesc.value().type(),
portDesc.value().portSpeed(), annotations);
}
/**
* @return primary ProviderID, or randomly chosen one if none exists
*/
private ProviderId pickPrimaryPID(
Map<ProviderId, DeviceDescriptions> providerDescs) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (!e.getKey().isAncillary()) {
return e.getKey();
} else if (fallBackPrimary == null) {
// pick randomly as a fallback in case there is no primary
fallBackPrimary = e.getKey();
}
}
return fallBackPrimary;
}
private DeviceDescriptions getPrimaryDescriptions(
Map<ProviderId, DeviceDescriptions> providerDescs) {
ProviderId pid = pickPrimaryPID(providerDescs);
return providerDescs.get(pid);
}
// TODO: should we be throwing exception?
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.unicast(message, recipient);
}
// TODO: should we be throwing exception?
private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalDeviceEvent event) throws IOException {
broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
}
private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
}
private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
}
private void notifyPeers(InternalPortEvent event) throws IOException {
broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
}
private void notifyPeers(InternalPortStatusEvent event) throws IOException {
broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
}
private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
try {
unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
} catch (IOException e) {
log.error("Failed to send" + event + " to " + recipient, e);
}
}
private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
try {
unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
} catch (IOException e) {
log.error("Failed to send" + event + " to " + recipient, e);
}
}
private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
try {
unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
} catch (IOException e) {
log.error("Failed to send" + event + " to " + recipient, e);
}
}
private void notifyPeer(NodeId recipient, InternalPortEvent event) {
try {
unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
} catch (IOException e) {
log.error("Failed to send" + event + " to " + recipient, e);
}
}
private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
try {
unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
} catch (IOException e) {
log.error("Failed to send" + event + " to " + recipient, e);
}
}
private DeviceAntiEntropyAdvertisement createAdvertisement() {
final NodeId self = clusterService.getLocalNode().id();
final int numDevices = deviceDescs.size();
Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
final int portsPerDevice = 8; // random factor to minimize reallocation
Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>>
provs : deviceDescs.entrySet()) {
// for each Device...
final DeviceId deviceId = provs.getKey();
final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
synchronized (devDescs) {
// send device offline timestamp
Timestamp lOffline = this.offline.get(deviceId);
if (lOffline != null) {
adOffline.put(deviceId, lOffline);
}
for (Entry<ProviderId, DeviceDescriptions>
prov : devDescs.entrySet()) {
// for each Provider Descriptions...
final ProviderId provId = prov.getKey();
final DeviceDescriptions descs = prov.getValue();
adDevices.put(new DeviceFragmentId(deviceId, provId),
descs.getDeviceDesc().timestamp());
for (Entry<PortNumber, Timestamped<PortDescription>>
portDesc : descs.getPortDescs().entrySet()) {
final PortNumber number = portDesc.getKey();
adPorts.put(new PortFragmentId(deviceId, provId, number),
portDesc.getValue().timestamp());
}
}
}
}
return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
}
/**
* Responds to anti-entropy advertisement message.
* <P>
* Notify sender about out-dated information using regular replication message.
* Send back advertisement to sender if not in sync.
*
* @param advertisement to respond to
*/
private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
final NodeId sender = advertisement.sender();
Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
// Fragments to request
Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
Collection<PortFragmentId> reqPorts = new ArrayList<>();
for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
final DeviceId deviceId = de.getKey();
final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
synchronized (lDevice) {
// latestTimestamp across provider
// Note: can be null initially
Timestamp localLatest = offline.get(deviceId);
// handle device Ads
for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
final ProviderId provId = prov.getKey();
final DeviceDescriptions lDeviceDescs = prov.getValue();
final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
Timestamp advDevTimestamp = devAds.get(devFragId);
if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
// remote does not have it or outdated, suggest
notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
} else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
// local is outdated, request
reqDevices.add(devFragId);
}
// handle port Ads
for (Entry<PortNumber, Timestamped<PortDescription>>
pe : lDeviceDescs.getPortDescs().entrySet()) {
final PortNumber num = pe.getKey();
final Timestamped<PortDescription> lPort = pe.getValue();
final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
Timestamp advPortTimestamp = portAds.get(portFragId);
if (advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
// remote does not have it or outdated, suggest
notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
} else if (!lPort.timestamp().equals(advPortTimestamp)) {
// local is outdated, request
log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
reqPorts.add(portFragId);
}
// remove port Ad already processed
portAds.remove(portFragId);
} // end local port loop
// remove device Ad already processed
devAds.remove(devFragId);
// find latest and update
final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
if (localLatest == null ||
providerLatest.compareTo(localLatest) > 0) {
localLatest = providerLatest;
}
} // end local provider loop
// checking if remote timestamp is more recent.
Timestamp rOffline = offlineAds.get(deviceId);
if (rOffline != null &&
rOffline.compareTo(localLatest) > 0) {
// remote offline timestamp suggests that the
// device is off-line
markOfflineInternal(deviceId, rOffline);
}
Timestamp lOffline = offline.get(deviceId);
if (lOffline != null && rOffline == null) {
// locally offline, but remote is online, suggest offline
notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
}
// remove device offline Ad already processed
offlineAds.remove(deviceId);
} // end local device loop
} // device lock
// If there is any Ads left, request them
log.trace("Ads left {}, {}", devAds, portAds);
reqDevices.addAll(devAds.keySet());
reqPorts.addAll(portAds.keySet());
if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
log.trace("Nothing to request to remote peer {}", sender);
return;
}
log.info("Need to sync {} {}", reqDevices, reqPorts);
// 2-way Anti-Entropy for now
try {
unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
} catch (IOException e) {
log.error("Failed to send response advertisement to " + sender, e);
}
// Sketch of 3-way Anti-Entropy
// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
// ClusterMessage message = new ClusterMessage(
// clusterService.getLocalNode().id(),
// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
// SERIALIZER.encode(request));
//
// try {
// clusterCommunicator.unicast(message, advertisement.sender());
// } catch (IOException e) {
// log.error("Failed to send advertisement reply to "
// + advertisement.sender(), e);
// }
}
private void notifyDelegateIfNotNull(DeviceEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
private final class SendAdvertisementTask implements Runnable {
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
final NodeId self = clusterService.getLocalNode().id();
Set<ControllerNode> nodes = clusterService.getNodes();
ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
.transform(toNodeId())
.toList();
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
log.debug("No other peers in the cluster.");
return;
}
NodeId peer;
do {
int idx = RandomUtils.nextInt(0, nodeIds.size());
peer = nodeIds.get(idx);
} while (peer.equals(self));
DeviceAntiEntropyAdvertisement ad = createAdvertisement();
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
unicastMessage(peer, DEVICE_ADVERTISE, ad);
} catch (IOException e) {
log.debug("Failed to send anti-entropy advertisement to {}", peer);
return;
}
} catch (Exception e) {
// catch all Exception to avoid Scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
}
}
}
private class InternalDeviceEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
}
}
private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received device offline event from peer: {}", message.sender());
InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
}
}
private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received device removed event from peer: {}", message.sender());
InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
}
}
private class InternalPortEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received port update event from peer: {}", message.sender());
InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
}
}
private class InternalPortStatusEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received port status update event from peer: {}", message.sender());
InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
log.info("{}", event);
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<PortDescription> portDescription = event.portDescription();
notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
}
}
private final class InternalDeviceAdvertisementListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
handleAdvertisement(advertisement);
}
}
}