Anti-Entropy support for link store.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index 7bbad93..d56ac92 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -4,9 +4,11 @@
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -15,6 +17,8 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultAnnotations;
@@ -47,18 +51,24 @@
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onlab.onos.net.DefaultAnnotations.union;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.base.Predicates.notNull;
@@ -110,13 +120,30 @@
}
};
+ private ScheduledExecutorService executor;
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_UPDATE, new InternalLinkEventListener());
+ GossipLinkStoreMessageSubjects.LINK_UPDATE,
+ new InternalLinkEventListener());
clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_REMOVED, new InternalLinkRemovedEventListener());
+ GossipLinkStoreMessageSubjects.LINK_REMOVED,
+ new InternalLinkRemovedEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
+ new InternalLinkAntiEntropyAdvertisementListener());
+
+ executor =
+ newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
+
+ // TODO: Make these configurable
+ long initialDelaySec = 5;
+ long periodSec = 5;
+ // start anti-entropy thread
+ executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
}
@@ -408,6 +435,10 @@
NewConcurrentHashMap.<ProviderId, Timestamped<LinkDescription>>ifNeeded());
}
+ private Timestamped<LinkDescription> getLinkDescription(LinkKey key, ProviderId providerId) {
+ return getLinkDescriptions(key).get(providerId);
+ }
+
private final Function<LinkKey, Link> lookupLink = new LookupLink();
private Function<LinkKey, Link> lookupLink() {
return lookupLink;
@@ -448,6 +479,19 @@
clusterCommunicator.broadcast(message);
}
+ // TODO: should we be throwing exception?
+ private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) {
+ try {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ SERIALIZER.encode(event));
+ clusterCommunicator.unicast(message, recipient);
+ } catch (IOException e) {
+ log.error("Failed to send a {} message to {}", subject.value(), recipient);
+ }
+ }
+
private void notifyPeers(InternalLinkEvent event) throws IOException {
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
}
@@ -456,6 +500,125 @@
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
}
+ private void notifyPeer(NodeId peer, InternalLinkEvent event) {
+ unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
+ }
+
+ private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
+ unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
+ }
+
+ private final class SendAdvertisementTask implements Runnable {
+
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ final NodeId self = clusterService.getLocalNode().id();
+ Set<ControllerNode> nodes = clusterService.getNodes();
+
+ ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
+ .transform(toNodeId())
+ .toList();
+
+ if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+ log.info("No other peers in the cluster.");
+ return;
+ }
+
+ NodeId peer;
+ do {
+ int idx = RandomUtils.nextInt(0, nodeIds.size());
+ peer = nodeIds.get(idx);
+ } while (peer.equals(self));
+
+ LinkAntiEntropyAdvertisement ad = createAdvertisement();
+
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
+ } catch (Exception e) {
+ log.error("Failed to send anti-entropy advertisement", e);
+ return;
+ }
+ } catch (Exception e) {
+ // catch all Exception to avoid Scheduled task being suppressed.
+ log.error("Exception thrown while sending advertisement", e);
+ }
+ }
+ }
+
+ private LinkAntiEntropyAdvertisement createAdvertisement() {
+ final NodeId self = clusterService.getLocalNode().id();
+
+ Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
+ Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
+
+ for (Entry<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>>
+ provs : linkDescs.entrySet()) {
+
+ final LinkKey linkKey = provs.getKey();
+ final ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDesc = provs.getValue();
+ synchronized (linkDesc) {
+ for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
+ linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
+ }
+ }
+ }
+
+ linkTombstones.putAll(removedLinks);
+
+ return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
+ }
+
+ private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement advertisement) {
+
+ NodeId peer = advertisement.sender();
+
+ Map<LinkFragmentId, Timestamp> linkTimestamps = advertisement.linkTimestamps();
+ Map<LinkKey, Timestamp> linkTombstones = advertisement.linkTombstones();
+ for (Map.Entry<LinkFragmentId, Timestamp> entry : linkTimestamps.entrySet()) {
+ LinkFragmentId linkFragmentId = entry.getKey();
+ Timestamp peerTimestamp = entry.getValue();
+
+ LinkKey key = linkFragmentId.linkKey();
+ ProviderId providerId = linkFragmentId.providerId();
+
+ Timestamped<LinkDescription> linkDescription = getLinkDescription(key, providerId);
+ if (linkDescription.isNewer(peerTimestamp)) {
+ // I have more recent link description. update peer.
+ notifyPeer(peer, new InternalLinkEvent(providerId, linkDescription));
+ }
+ // else TODO: Peer has more recent link description. request it.
+
+ Timestamp linkRemovedTimestamp = removedLinks.get(key);
+ if (linkRemovedTimestamp != null && linkRemovedTimestamp.compareTo(peerTimestamp) > 0) {
+ // peer has a zombie link. update peer.
+ notifyPeer(peer, new InternalLinkRemovedEvent(key, linkRemovedTimestamp));
+ }
+ }
+
+ for (Map.Entry<LinkKey, Timestamp> entry : linkTombstones.entrySet()) {
+ LinkKey key = entry.getKey();
+ Timestamp peerTimestamp = entry.getValue();
+
+ ProviderId primaryProviderId = pickPrimaryProviderId(getLinkDescriptions(key));
+ if (primaryProviderId != null) {
+ if (!getLinkDescription(key, primaryProviderId).isNewer(peerTimestamp)) {
+ notifyDelegateIfNotNull(removeLinkInternal(key, peerTimestamp));
+ }
+ }
+ }
+ }
+
private class InternalLinkEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
@@ -483,4 +646,14 @@
notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
}
}
+
+ private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ log.info("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
+ LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
+ handleAntiEntropyAdvertisement(advertisement);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java
index 46e7186..817efe4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java
@@ -9,6 +9,10 @@
private GossipLinkStoreMessageSubjects() {}
- public static final MessageSubject LINK_UPDATE = new MessageSubject("peer-link-update");
- public static final MessageSubject LINK_REMOVED = new MessageSubject("peer-link-removed");
+ public static final MessageSubject LINK_UPDATE =
+ new MessageSubject("peer-link-update");
+ public static final MessageSubject LINK_REMOVED =
+ new MessageSubject("peer-link-removed");
+ public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT =
+ new MessageSubject("link-enti-entropy-advertisement");
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkAntiEntropyAdvertisement.java
new file mode 100644
index 0000000..a41f9cd
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkAntiEntropyAdvertisement.java
@@ -0,0 +1,48 @@
+package org.onlab.onos.store.link.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Link AE Advertisement message.
+ */
+public class LinkAntiEntropyAdvertisement {
+
+ private final NodeId sender;
+ private final Map<LinkFragmentId, Timestamp> linkTimestamps;
+ private final Map<LinkKey, Timestamp> linkTombstones;
+
+
+ public LinkAntiEntropyAdvertisement(NodeId sender,
+ Map<LinkFragmentId, Timestamp> linkTimestamps,
+ Map<LinkKey, Timestamp> linkTombstones) {
+ this.sender = checkNotNull(sender);
+ this.linkTimestamps = checkNotNull(linkTimestamps);
+ this.linkTombstones = checkNotNull(linkTombstones);
+ }
+
+ public NodeId sender() {
+ return sender;
+ }
+
+ public Map<LinkFragmentId, Timestamp> linkTimestamps() {
+ return linkTimestamps;
+ }
+
+ public Map<LinkKey, Timestamp> linkTombstones() {
+ return linkTombstones;
+ }
+
+ // For serializer
+ @SuppressWarnings("unused")
+ private LinkAntiEntropyAdvertisement() {
+ this.sender = null;
+ this.linkTimestamps = null;
+ this.linkTombstones = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkFragmentId.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkFragmentId.java
new file mode 100644
index 0000000..f97bef6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/LinkFragmentId.java
@@ -0,0 +1,62 @@
+package org.onlab.onos.store.link.impl;
+
+import java.util.Objects;
+
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Identifier for LinkDescription from a Provider.
+ */
+public final class LinkFragmentId {
+ public final ProviderId providerId;
+ public final LinkKey linkKey;
+
+ public LinkFragmentId(LinkKey linkKey, ProviderId providerId) {
+ this.providerId = providerId;
+ this.linkKey = linkKey;
+ }
+
+ public LinkKey linkKey() {
+ return linkKey;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(providerId, linkKey);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof LinkFragmentId)) {
+ return false;
+ }
+ LinkFragmentId that = (LinkFragmentId) obj;
+ return Objects.equals(this.linkKey, that.linkKey) &&
+ Objects.equals(this.providerId, that.providerId);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("linkKey", linkKey)
+ .toString();
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private LinkFragmentId() {
+ this.providerId = null;
+ this.linkKey = null;
+ }
+}