Injecting topology through JSON ConfigProvider works for multi-instance (ONOS-490).
Change-Id: Ib977f4cf9a59ddec360072891fd803c6f9ee84f1
Injecting optical device annotations and ports works for multi-instance (ONOS-870).
Change-Id: Icdde16ef72fc4e47eec7213250b04902083f0537
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 6cc45ea..6131190 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -32,6 +32,7 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.AnnotationsUtil;
import org.onosproject.net.ConnectPoint;
@@ -90,9 +91,7 @@
import static org.onosproject.net.Link.Type.DIRECT;
import static org.onosproject.net.Link.Type.INDIRECT;
import static org.onosproject.net.LinkKey.linkKey;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
+import static org.onosproject.net.link.LinkEvent.Type.*;
import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
import static org.slf4j.LoggerFactory.getLogger;
@@ -106,6 +105,9 @@
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
+ // Timeout in milliseconds to process links on remote master node
+ private static final int REMOTE_MASTER_TIMEOUT = 1000;
+
private final Logger log = getLogger(getClass());
// Link inventory
@@ -131,6 +133,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -141,6 +146,7 @@
.register(InternalLinkRemovedEvent.class)
.register(LinkAntiEntropyAdvertisement.class)
.register(LinkFragmentId.class)
+ .register(LinkInjectedEvent.class)
.build();
}
};
@@ -161,6 +167,9 @@
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener());
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_INJECTED,
+ new LinkInjectedEventListener());
executor = Executors.newCachedThreadPool(namedThreads("onos-link-fg-%d"));
@@ -270,27 +279,52 @@
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
- DeviceId dstDeviceId = linkDescription.dst().deviceId();
- Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
+ final DeviceId dstDeviceId = linkDescription.dst().deviceId();
+ final NodeId localNode = clusterService.getLocalNode().id();
+ final NodeId dstNode = mastershipService.getMasterFor(dstDeviceId);
- final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
+ // Process link update only if we're the master of the destination node,
+ // otherwise signal the actual master.
+ LinkEvent linkEvent = null;
+ if (localNode.equals(dstNode)) {
- LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
- final LinkEvent event;
- final Timestamped<LinkDescription> mergedDesc;
- Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
- synchronized (map) {
- event = createOrUpdateLinkInternal(providerId, deltaDesc);
- mergedDesc = map.get(providerId);
+ Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
+
+ final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
+
+ LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
+ final Timestamped<LinkDescription> mergedDesc;
+ Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
+
+ synchronized (map) {
+ linkEvent = createOrUpdateLinkInternal(providerId, deltaDesc);
+ mergedDesc = map.get(providerId);
+ }
+
+ if (linkEvent != null) {
+ log.info("Notifying peers of a link update topology event from providerId: "
+ + "{} between src: {} and dst: {}",
+ providerId, linkDescription.src(), linkDescription.dst());
+ notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
+ }
+
+ } else {
+
+ LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
+ ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
+ GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
+
+ try {
+ clusterCommunicator.unicast(linkInjectedMessage, dstNode);
+ } catch (IOException e) {
+ log.warn("Failed to process link update between src: {} and dst: {} " +
+ "(cluster messaging failed: {})",
+ linkDescription.src(), linkDescription.dst(), e);
+ }
+
}
- if (event != null) {
- log.info("Notifying peers of a link update topology event from providerId: "
- + "{} between src: {} and dst: {}",
- providerId, linkDescription.src(), linkDescription.dst());
- notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
- }
- return event;
+ return linkEvent;
}
@Override
@@ -318,7 +352,7 @@
Timestamped<LinkDescription> linkDescription) {
final LinkKey key = linkKey(linkDescription.value().src(),
- linkDescription.value().dst());
+ linkDescription.value().dst());
Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
synchronized (descs) {
@@ -397,7 +431,7 @@
!AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
links.put(key, newLink);
- // strictly speaking following can be ommitted
+ // strictly speaking following can be omitted
srcLinks.put(oldLink.src().deviceId(), key);
dstLinks.put(oldLink.dst().deviceId(), key);
return new LinkEvent(LINK_UPDATED, newLink);
@@ -848,4 +882,25 @@
});
}
}
+
+ private final class LinkInjectedEventListener
+ implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.trace("Received injected link event from peer: {}", message.sender());
+ LinkInjectedEvent linkInjectedEvent = SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = linkInjectedEvent.providerId();
+ LinkDescription linkDescription = linkInjectedEvent.linkDescription();
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ createOrUpdateLink(providerId, linkDescription);
+ }
+ });
+ }
+ }
}