Injecting topology through JSON ConfigProvider works for multi-instance (ONOS-490).
Change-Id: Ib977f4cf9a59ddec360072891fd803c6f9ee84f1
Injecting optical device annotations and ports works for multi-instance (ONOS-870).
Change-Id: Icdde16ef72fc4e47eec7213250b04902083f0537
diff --git a/apps/optical/src/main/java/org/onosproject/optical/OpticalPathProvisioner.java b/apps/optical/src/main/java/org/onosproject/optical/OpticalPathProvisioner.java
index fc33693..13796f9 100644
--- a/apps/optical/src/main/java/org/onosproject/optical/OpticalPathProvisioner.java
+++ b/apps/optical/src/main/java/org/onosproject/optical/OpticalPathProvisioner.java
@@ -179,7 +179,7 @@
if (!IntentState.FAILED.equals(intentService.getIntentState(intent.key()))) {
return;
- }
+ }
List<Intent> intents = Lists.newArrayList();
if (intent instanceof HostToHostIntent) {
diff --git a/core/api/src/main/java/org/onosproject/event/AbstractEvent.java b/core/api/src/main/java/org/onosproject/event/AbstractEvent.java
index 51d1caa..8d4e522 100644
--- a/core/api/src/main/java/org/onosproject/event/AbstractEvent.java
+++ b/core/api/src/main/java/org/onosproject/event/AbstractEvent.java
@@ -15,10 +15,10 @@
*/
package org.onosproject.event;
-import static com.google.common.base.MoreObjects.toStringHelper;
-
import org.joda.time.LocalDateTime;
+import static com.google.common.base.MoreObjects.toStringHelper;
+
/**
* Base event implementation.
*/
@@ -75,5 +75,4 @@
.add("subject", subject())
.toString();
}
-
}
diff --git a/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java b/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java
index 5315994..fac6204 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java
@@ -47,7 +47,7 @@
* @param hwVersion device HW version
* @param swVersion device SW version
* @param serialNumber device serial number
- * @param chassis chasis id
+ * @param chassis chassis id
* @param annotations optional key/value annotations map
*/
public DefaultDeviceDescription(URI uri, Type type, String manufacturer,
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index e8cccf4..69ef991 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -306,18 +306,16 @@
// TODO: Do we need to explicitly tell the Provider that
// this instance is not the MASTER
applyRole(deviceId, MastershipRole.STANDBY);
- return;
+ } else {
+ log.info("Role of this node is MASTER for {}", deviceId);
+ // tell clock provider if this instance is the master
+ deviceClockProviderService.setMastershipTerm(deviceId, term);
+ applyRole(deviceId, MastershipRole.MASTER);
}
- log.info("Role of this node is MASTER for {}", deviceId);
-
- // tell clock provider if this instance is the master
- deviceClockProviderService.setMastershipTerm(deviceId, term);
DeviceEvent event = store.createOrUpdateDevice(provider().id(),
deviceId, deviceDescription);
- applyRole(deviceId, MastershipRole.MASTER);
-
// If there was a change of any kind, tell the provider
// that this instance is the master.
if (event != null) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceInjectedEvent.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceInjectedEvent.java
new file mode 100644
index 0000000..6f93963
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceInjectedEvent.java
@@ -0,0 +1,49 @@
+package org.onosproject.store.device.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.provider.ProviderId;
+
+public class DeviceInjectedEvent {
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final DeviceDescription deviceDescription;
+
+ protected DeviceInjectedEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ DeviceDescription deviceDescription) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.deviceDescription = deviceDescription;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public DeviceDescription deviceDescription() {
+ return deviceDescription;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("deviceId", deviceId)
+ .add("deviceDescription", deviceDescription)
+ .toString();
+ }
+
+ // for serializer
+ protected DeviceInjectedEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.deviceDescription = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 3a94087..d29fadb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -29,6 +28,9 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.ChassisId;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
@@ -61,9 +63,6 @@
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
-import org.onlab.packet.ChassisId;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.io.IOException;
@@ -86,17 +85,17 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull;
-import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
-import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
-import static org.onosproject.net.DefaultAnnotations.merge;
import static com.google.common.base.Verify.verify;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
+import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of infrastructure devices using gossip protocol to distribute
@@ -111,6 +110,8 @@
private final Logger log = getLogger(getClass());
private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+ // Timeout in milliseconds to process device or ports on remote master node
+ private static final int REMOTE_MASTER_TIMEOUT = 1000;
// innerMap is used to lock a Device, thus instance should never be replaced.
// collection of Description given from various providers
@@ -158,6 +159,8 @@
.register(DeviceAntiEntropyAdvertisement.class)
.register(DeviceFragmentId.class)
.register(PortFragmentId.class)
+ .register(DeviceInjectedEvent.class)
+ .register(PortInjectedEvent.class)
.build();
}
};
@@ -186,6 +189,10 @@
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener());
executor = Executors.newCachedThreadPool(namedThreads("onos-device-fg-%d"));
@@ -251,21 +258,48 @@
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) {
- final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
- final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
- final DeviceEvent event;
- final Timestamped<DeviceDescription> mergedDesc;
- final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
- synchronized (device) {
- event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
- mergedDesc = device.get(providerId).getDeviceDesc();
+ NodeId localNode = clusterService.getLocalNode().id();
+ NodeId deviceNode = mastershipService.getMasterFor(deviceId);
+
+ // Process device update only if we're the master,
+ // otherwise signal the actual master.
+ DeviceEvent deviceEvent = null;
+ if (localNode.equals(deviceNode)) {
+
+ final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
+ final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
+ final Timestamped<DeviceDescription> mergedDesc;
+ final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+
+ synchronized (device) {
+ deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+ mergedDesc = device.get(providerId).getDeviceDesc();
+ }
+
+ if (deviceEvent != null) {
+ log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
+ }
+
+ } else {
+
+ DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
+ providerId, deviceId, deviceDescription);
+ ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
+ SERIALIZER.encode(deviceInjectedEvent));
+
+ try {
+ clusterCommunicator.unicast(clusterMessage, deviceNode);
+ } catch (IOException e) {
+ log.warn("Failed to process injected device id: {} desc: {} " +
+ "(cluster messaging failed: {})",
+ deviceId, deviceDescription, e);
+ }
+
}
- if (event != null) {
- log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
- providerId, deviceId);
- notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
- }
- return event;
+
+ return deviceEvent;
}
private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
@@ -434,52 +468,81 @@
DeviceId deviceId,
List<PortDescription> portDescriptions) {
- final Timestamp newTimestamp;
- try {
- newTimestamp = deviceClockService.getTimestamp(deviceId);
- } catch (IllegalStateException e) {
- log.info("Timestamp was not available for device {}", deviceId);
- log.debug(" discarding {}", portDescriptions);
- // Failed to generate timestamp.
+ NodeId localNode = clusterService.getLocalNode().id();
+ // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
+ // since it will trigger distributed store read.
+ // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
+ // outside Device subsystem. so that we don't have to modify both Device and Link stores.
+ // If we don't care much about topology performance, then it might be OK.
+ NodeId deviceNode = mastershipService.getMasterFor(deviceId);
- // Possible situation:
- // Device connected and became master for short period of time,
- // but lost mastership before this instance had the chance to
- // retrieve term information.
+ // Process port update only if we're the master of the device,
+ // otherwise signal the actual master.
+ List<DeviceEvent> deviceEvents = null;
+ if (localNode.equals(deviceNode)) {
- // Information dropped here is expected to be recoverable by
- // device probing after mastership change
+ final Timestamp newTimestamp;
+ try {
+ newTimestamp = deviceClockService.getTimestamp(deviceId);
+ } catch (IllegalStateException e) {
+ log.info("Timestamp was not available for device {}", deviceId);
+ log.debug(" discarding {}", portDescriptions);
+ // Failed to generate timestamp.
- return Collections.emptyList();
+ // Possible situation:
+ // Device connected and became master for short period of time,
+ // but lost mastership before this instance had the chance to
+ // retrieve term information.
+
+ // Information dropped here is expected to be recoverable by
+ // device probing after mastership change
+
+ return Collections.emptyList();
+ }
+ log.debug("timestamp for {} {}", deviceId, newTimestamp);
+
+ final Timestamped<List<PortDescription>> timestampedInput
+ = new Timestamped<>(portDescriptions, newTimestamp);
+ final Timestamped<List<PortDescription>> merged;
+
+ final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+
+ synchronized (device) {
+ deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
+ final DeviceDescriptions descs = device.get(providerId);
+ List<PortDescription> mergedList =
+ FluentIterable.from(portDescriptions)
+ .transform(new Function<PortDescription, PortDescription>() {
+ @Override
+ public PortDescription apply(PortDescription input) {
+ // lookup merged port description
+ return descs.getPortDesc(input.portNumber()).value();
+ }
+ }).toList();
+ merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
+ }
+
+ if (!deviceEvents.isEmpty()) {
+ log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
+ }
+
+ } else {
+
+ PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
+ ClusterMessage clusterMessage = new ClusterMessage(
+ localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
+ try {
+ clusterCommunicator.unicast(clusterMessage, deviceNode);
+ } catch (IOException e) {
+ log.warn("Failed to process injected ports of device id: {} " +
+ "(cluster messaging failed: {})",
+ deviceId, e);
+ }
}
- log.debug("timestamp for {} {}", deviceId, newTimestamp);
- final Timestamped<List<PortDescription>> timestampedInput
- = new Timestamped<>(portDescriptions, newTimestamp);
- final List<DeviceEvent> events;
- final Timestamped<List<PortDescription>> merged;
-
- final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
- synchronized (device) {
- events = updatePortsInternal(providerId, deviceId, timestampedInput);
- final DeviceDescriptions descs = device.get(providerId);
- List<PortDescription> mergedList =
- FluentIterable.from(portDescriptions)
- .transform(new Function<PortDescription, PortDescription>() {
- @Override
- public PortDescription apply(PortDescription input) {
- // lookup merged port description
- return descs.getPortDesc(input.portNumber()).value();
- }
- }).toList();
- merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
- }
- if (!events.isEmpty()) {
- log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
- providerId, deviceId);
- notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
- }
- return events;
+ return deviceEvents;
}
private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
@@ -1431,4 +1494,48 @@
});
}
}
+
+ private final class DeviceInjectedEventListener
+ implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.debug("Received injected device event from peer: {}", message.sender());
+ DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ DeviceDescription deviceDescription = event.deviceDescription();
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ createOrUpdateDevice(providerId, deviceId, deviceDescription);
+ }
+ });
+ }
+ }
+
+ private final class PortInjectedEventListener
+ implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.debug("Received injected port event from peer: {}", message.sender());
+ PortInjectedEvent event = SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ List<PortDescription> portDescriptions = event.portDescriptions();
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ updatePorts(providerId, deviceId, portDescriptions);
+ }
+ });
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java
index 83fdf98..9f7361b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -34,4 +34,8 @@
public static final MessageSubject DEVICE_ADVERTISE = new MessageSubject("peer-device-advertisements");
// to be used with 3-way anti-entropy process
public static final MessageSubject DEVICE_REQUEST = new MessageSubject("peer-device-request");
+
+ // Network elements injected (not discovered) by ConfigProvider
+ public static final MessageSubject DEVICE_INJECTED = new MessageSubject("peer-device-injected");
+ public static final MessageSubject PORT_INJECTED = new MessageSubject("peer-port-injected");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortInjectedEvent.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortInjectedEvent.java
new file mode 100644
index 0000000..c80f810
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortInjectedEvent.java
@@ -0,0 +1,50 @@
+package org.onosproject.store.device.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.provider.ProviderId;
+
+import java.util.List;
+
+public class PortInjectedEvent {
+
+ private ProviderId providerId;
+ private DeviceId deviceId;
+ private List<PortDescription> portDescriptions;
+
+ protected PortInjectedEvent(ProviderId providerId, DeviceId deviceId, List<PortDescription> portDescriptions) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.portDescriptions = portDescriptions;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public List<PortDescription> portDescriptions() {
+ return portDescriptions;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("deviceId", deviceId)
+ .add("portDescriptions", portDescriptions)
+ .toString();
+ }
+
+ // for serializer
+ protected PortInjectedEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portDescriptions = null;
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 6cc45ea..6131190 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -32,6 +32,7 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.AnnotationsUtil;
import org.onosproject.net.ConnectPoint;
@@ -90,9 +91,7 @@
import static org.onosproject.net.Link.Type.DIRECT;
import static org.onosproject.net.Link.Type.INDIRECT;
import static org.onosproject.net.LinkKey.linkKey;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
+import static org.onosproject.net.link.LinkEvent.Type.*;
import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
import static org.slf4j.LoggerFactory.getLogger;
@@ -106,6 +105,9 @@
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
+ // Timeout in milliseconds to process links on remote master node
+ private static final int REMOTE_MASTER_TIMEOUT = 1000;
+
private final Logger log = getLogger(getClass());
// Link inventory
@@ -131,6 +133,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -141,6 +146,7 @@
.register(InternalLinkRemovedEvent.class)
.register(LinkAntiEntropyAdvertisement.class)
.register(LinkFragmentId.class)
+ .register(LinkInjectedEvent.class)
.build();
}
};
@@ -161,6 +167,9 @@
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener());
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_INJECTED,
+ new LinkInjectedEventListener());
executor = Executors.newCachedThreadPool(namedThreads("onos-link-fg-%d"));
@@ -270,27 +279,52 @@
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
- DeviceId dstDeviceId = linkDescription.dst().deviceId();
- Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
+ final DeviceId dstDeviceId = linkDescription.dst().deviceId();
+ final NodeId localNode = clusterService.getLocalNode().id();
+ final NodeId dstNode = mastershipService.getMasterFor(dstDeviceId);
- final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
+ // Process link update only if we're the master of the destination node,
+ // otherwise signal the actual master.
+ LinkEvent linkEvent = null;
+ if (localNode.equals(dstNode)) {
- LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
- final LinkEvent event;
- final Timestamped<LinkDescription> mergedDesc;
- Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
- synchronized (map) {
- event = createOrUpdateLinkInternal(providerId, deltaDesc);
- mergedDesc = map.get(providerId);
+ Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
+
+ final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
+
+ LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
+ final Timestamped<LinkDescription> mergedDesc;
+ Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
+
+ synchronized (map) {
+ linkEvent = createOrUpdateLinkInternal(providerId, deltaDesc);
+ mergedDesc = map.get(providerId);
+ }
+
+ if (linkEvent != null) {
+ log.info("Notifying peers of a link update topology event from providerId: "
+ + "{} between src: {} and dst: {}",
+ providerId, linkDescription.src(), linkDescription.dst());
+ notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
+ }
+
+ } else {
+
+ LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
+ ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
+ GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
+
+ try {
+ clusterCommunicator.unicast(linkInjectedMessage, dstNode);
+ } catch (IOException e) {
+ log.warn("Failed to process link update between src: {} and dst: {} " +
+ "(cluster messaging failed: {})",
+ linkDescription.src(), linkDescription.dst(), e);
+ }
+
}
- if (event != null) {
- log.info("Notifying peers of a link update topology event from providerId: "
- + "{} between src: {} and dst: {}",
- providerId, linkDescription.src(), linkDescription.dst());
- notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
- }
- return event;
+ return linkEvent;
}
@Override
@@ -318,7 +352,7 @@
Timestamped<LinkDescription> linkDescription) {
final LinkKey key = linkKey(linkDescription.value().src(),
- linkDescription.value().dst());
+ linkDescription.value().dst());
Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
synchronized (descs) {
@@ -397,7 +431,7 @@
!AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
links.put(key, newLink);
- // strictly speaking following can be ommitted
+ // strictly speaking following can be omitted
srcLinks.put(oldLink.src().deviceId(), key);
dstLinks.put(oldLink.dst().deviceId(), key);
return new LinkEvent(LINK_UPDATED, newLink);
@@ -848,4 +882,25 @@
});
}
}
+
+ private final class LinkInjectedEventListener
+ implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.trace("Received injected link event from peer: {}", message.sender());
+ LinkInjectedEvent linkInjectedEvent = SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = linkInjectedEvent.providerId();
+ LinkDescription linkDescription = linkInjectedEvent.linkDescription();
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ createOrUpdateLink(providerId, linkDescription);
+ }
+ });
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java
index 6b583ef..1d261b5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java
@@ -15,7 +15,7 @@
*/
package org.onosproject.store.link.impl;
-import org.onosproject.store.cluster.messaging.MessageSubject;
+ import org.onosproject.store.cluster.messaging.MessageSubject;
/**
* MessageSubjects used by GossipLinkStore peer-peer communication.
@@ -30,4 +30,6 @@
new MessageSubject("peer-link-removed");
public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT =
new MessageSubject("link-enti-entropy-advertisement");
+ public static final MessageSubject LINK_INJECTED =
+ new MessageSubject("peer-link-injected");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkInjectedEvent.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkInjectedEvent.java
new file mode 100644
index 0000000..356033b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkInjectedEvent.java
@@ -0,0 +1,38 @@
+package org.onosproject.store.link.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.net.link.LinkDescription;
+import org.onosproject.net.provider.ProviderId;
+
+public class LinkInjectedEvent {
+
+ ProviderId providerId;
+ LinkDescription linkDescription;
+
+ public LinkInjectedEvent(ProviderId providerId, LinkDescription linkDescription) {
+ this.providerId = providerId;
+ this.linkDescription = linkDescription;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public LinkDescription linkDescription() {
+ return linkDescription;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("linkDescription", linkDescription)
+ .toString();
+ }
+
+ // for serializer
+ protected LinkInjectedEvent() {
+ this.providerId = null;
+ this.linkDescription = null;
+ }
+}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
index 1251e96..eeca2a8b 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
@@ -27,6 +27,8 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
@@ -115,7 +117,7 @@
private DeviceClockManager deviceClockManager;
private DeviceClockService deviceClockService;
private ClusterCommunicationService clusterCommunicator;
-
+ private MastershipService mastershipService;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -146,11 +148,13 @@
linkStoreImpl.deviceClockService = deviceClockService;
linkStoreImpl.clusterCommunicator = clusterCommunicator;
linkStoreImpl.clusterService = new TestClusterService();
+ linkStoreImpl.mastershipService = new TestMastershipService();
linkStoreImpl.activate();
linkStore = linkStoreImpl;
verify(clusterCommunicator);
reset(clusterCommunicator);
+
}
@After
@@ -602,4 +606,11 @@
nodeStates.put(NID2, ACTIVE);
}
}
+
+ private final class TestMastershipService extends MastershipServiceAdapter {
+ @Override
+ public NodeId getMasterFor(DeviceId deviceId) {
+ return NID1;
+ }
+ }
}