Injecting topology through JSON ConfigProvider works for multi-instance (ONOS-490).
Change-Id: Ib977f4cf9a59ddec360072891fd803c6f9ee84f1
Injecting optical device annotations and ports works for multi-instance (ONOS-870).
Change-Id: Icdde16ef72fc4e47eec7213250b04902083f0537
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 3a94087..d29fadb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -29,6 +28,9 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.ChassisId;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
@@ -61,9 +63,6 @@
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
-import org.onlab.packet.ChassisId;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.io.IOException;
@@ -86,17 +85,17 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull;
-import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
-import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
-import static org.onosproject.net.DefaultAnnotations.merge;
import static com.google.common.base.Verify.verify;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
+import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of infrastructure devices using gossip protocol to distribute
@@ -111,6 +110,8 @@
private final Logger log = getLogger(getClass());
private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+ // Timeout in milliseconds to process device or ports on remote master node
+ private static final int REMOTE_MASTER_TIMEOUT = 1000;
// innerMap is used to lock a Device, thus instance should never be replaced.
// collection of Description given from various providers
@@ -158,6 +159,8 @@
.register(DeviceAntiEntropyAdvertisement.class)
.register(DeviceFragmentId.class)
.register(PortFragmentId.class)
+ .register(DeviceInjectedEvent.class)
+ .register(PortInjectedEvent.class)
.build();
}
};
@@ -186,6 +189,10 @@
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener());
executor = Executors.newCachedThreadPool(namedThreads("onos-device-fg-%d"));
@@ -251,21 +258,48 @@
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) {
- final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
- final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
- final DeviceEvent event;
- final Timestamped<DeviceDescription> mergedDesc;
- final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
- synchronized (device) {
- event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
- mergedDesc = device.get(providerId).getDeviceDesc();
+ NodeId localNode = clusterService.getLocalNode().id();
+ NodeId deviceNode = mastershipService.getMasterFor(deviceId);
+
+ // Process device update only if we're the master,
+ // otherwise signal the actual master.
+ DeviceEvent deviceEvent = null;
+ if (localNode.equals(deviceNode)) {
+
+ final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
+ final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
+ final Timestamped<DeviceDescription> mergedDesc;
+ final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+
+ synchronized (device) {
+ deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+ mergedDesc = device.get(providerId).getDeviceDesc();
+ }
+
+ if (deviceEvent != null) {
+ log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
+ }
+
+ } else {
+
+ DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
+ providerId, deviceId, deviceDescription);
+ ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
+ SERIALIZER.encode(deviceInjectedEvent));
+
+ try {
+ clusterCommunicator.unicast(clusterMessage, deviceNode);
+ } catch (IOException e) {
+ log.warn("Failed to process injected device id: {} desc: {} " +
+ "(cluster messaging failed: {})",
+ deviceId, deviceDescription, e);
+ }
+
}
- if (event != null) {
- log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
- providerId, deviceId);
- notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
- }
- return event;
+
+ return deviceEvent;
}
private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
@@ -434,52 +468,81 @@
DeviceId deviceId,
List<PortDescription> portDescriptions) {
- final Timestamp newTimestamp;
- try {
- newTimestamp = deviceClockService.getTimestamp(deviceId);
- } catch (IllegalStateException e) {
- log.info("Timestamp was not available for device {}", deviceId);
- log.debug(" discarding {}", portDescriptions);
- // Failed to generate timestamp.
+ NodeId localNode = clusterService.getLocalNode().id();
+ // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
+ // since it will trigger distributed store read.
+ // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
+ // outside Device subsystem. so that we don't have to modify both Device and Link stores.
+ // If we don't care much about topology performance, then it might be OK.
+ NodeId deviceNode = mastershipService.getMasterFor(deviceId);
- // Possible situation:
- // Device connected and became master for short period of time,
- // but lost mastership before this instance had the chance to
- // retrieve term information.
+ // Process port update only if we're the master of the device,
+ // otherwise signal the actual master.
+ List<DeviceEvent> deviceEvents = null;
+ if (localNode.equals(deviceNode)) {
- // Information dropped here is expected to be recoverable by
- // device probing after mastership change
+ final Timestamp newTimestamp;
+ try {
+ newTimestamp = deviceClockService.getTimestamp(deviceId);
+ } catch (IllegalStateException e) {
+ log.info("Timestamp was not available for device {}", deviceId);
+ log.debug(" discarding {}", portDescriptions);
+ // Failed to generate timestamp.
- return Collections.emptyList();
+ // Possible situation:
+ // Device connected and became master for short period of time,
+ // but lost mastership before this instance had the chance to
+ // retrieve term information.
+
+ // Information dropped here is expected to be recoverable by
+ // device probing after mastership change
+
+ return Collections.emptyList();
+ }
+ log.debug("timestamp for {} {}", deviceId, newTimestamp);
+
+ final Timestamped<List<PortDescription>> timestampedInput
+ = new Timestamped<>(portDescriptions, newTimestamp);
+ final Timestamped<List<PortDescription>> merged;
+
+ final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+
+ synchronized (device) {
+ deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
+ final DeviceDescriptions descs = device.get(providerId);
+ List<PortDescription> mergedList =
+ FluentIterable.from(portDescriptions)
+ .transform(new Function<PortDescription, PortDescription>() {
+ @Override
+ public PortDescription apply(PortDescription input) {
+ // lookup merged port description
+ return descs.getPortDesc(input.portNumber()).value();
+ }
+ }).toList();
+ merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
+ }
+
+ if (!deviceEvents.isEmpty()) {
+ log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
+ }
+
+ } else {
+
+ PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
+ ClusterMessage clusterMessage = new ClusterMessage(
+ localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
+ try {
+ clusterCommunicator.unicast(clusterMessage, deviceNode);
+ } catch (IOException e) {
+ log.warn("Failed to process injected ports of device id: {} " +
+ "(cluster messaging failed: {})",
+ deviceId, e);
+ }
}
- log.debug("timestamp for {} {}", deviceId, newTimestamp);
- final Timestamped<List<PortDescription>> timestampedInput
- = new Timestamped<>(portDescriptions, newTimestamp);
- final List<DeviceEvent> events;
- final Timestamped<List<PortDescription>> merged;
-
- final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
- synchronized (device) {
- events = updatePortsInternal(providerId, deviceId, timestampedInput);
- final DeviceDescriptions descs = device.get(providerId);
- List<PortDescription> mergedList =
- FluentIterable.from(portDescriptions)
- .transform(new Function<PortDescription, PortDescription>() {
- @Override
- public PortDescription apply(PortDescription input) {
- // lookup merged port description
- return descs.getPortDesc(input.portNumber()).value();
- }
- }).toList();
- merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
- }
- if (!events.isEmpty()) {
- log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
- providerId, deviceId);
- notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
- }
- return events;
+ return deviceEvents;
}
private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
@@ -1431,4 +1494,48 @@
});
}
}
+
+ private final class DeviceInjectedEventListener
+ implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.debug("Received injected device event from peer: {}", message.sender());
+ DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ DeviceDescription deviceDescription = event.deviceDescription();
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ createOrUpdateDevice(providerId, deviceId, deviceDescription);
+ }
+ });
+ }
+ }
+
+ private final class PortInjectedEventListener
+ implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.debug("Received injected port event from peer: {}", message.sender());
+ PortInjectedEvent event = SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ List<PortDescription> portDescriptions = event.portDescriptions();
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ updatePorts(providerId, deviceId, portDescriptions);
+ }
+ });
+ }
+ }
}