GossipLinkStore AE bugfix + cleanup
Change-Id: If4cbaa65f980f10713488e6bf1be5d01c3131780
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index a6c1660..21e52b2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -1,7 +1,6 @@
package org.onlab.onos.store.link.impl;
import com.google.common.base.Function;
-import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
@@ -27,7 +26,6 @@
import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.Link.Type;
import org.onlab.onos.net.LinkKey;
-import org.onlab.onos.net.Provided;
import org.onlab.onos.net.device.DeviceClockService;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkDescription;
@@ -70,7 +68,9 @@
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
+import static org.onlab.onos.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
/**
* Manages inventory of infrastructure links in distributed data store
@@ -239,9 +239,9 @@
LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
final LinkEvent event;
final Timestamped<LinkDescription> mergedDesc;
- synchronized (getLinkDescriptions(key)) {
+ synchronized (getOrCreateLinkDescriptions(key)) {
event = createOrUpdateLinkInternal(providerId, deltaDesc);
- mergedDesc = getLinkDescriptions(key).get(providerId);
+ mergedDesc = getOrCreateLinkDescriptions(key).get(providerId);
}
if (event != null) {
@@ -265,7 +265,7 @@
LinkKey key = linkKey(linkDescription.value().src(),
linkDescription.value().dst());
- Map<ProviderId, Timestamped<LinkDescription>> descs = getLinkDescriptions(key);
+ Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
synchronized (descs) {
// if the link was previously removed, we should proceed if and
@@ -296,7 +296,7 @@
ProviderId providerId,
Timestamped<LinkDescription> linkDescription) {
- // merge existing attributes and merge
+ // merge existing annotations
Timestamped<LinkDescription> existingLinkDescription = descs.get(providerId);
if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
return null;
@@ -377,14 +377,54 @@
return event;
}
- private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
- Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions =
- getLinkDescriptions(key);
+ private static Timestamped<LinkDescription> getPrimaryDescription(
+ Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
+
synchronized (linkDescriptions) {
+ for (Entry<ProviderId, Timestamped<LinkDescription>>
+ e : linkDescriptions.entrySet()) {
+
+ if (!e.getKey().isAncillary()) {
+ return e.getValue();
+ }
+ }
+ }
+ return null;
+ }
+
+
+ // TODO: consider slicing out as Timestamp utils
+ /**
+ * Checks is timestamp is more recent than timestamped object.
+ *
+ * @param timestamp to check if this is more recent then other
+ * @param timestamped object to be tested against
+ * @return true if {@code timestamp} is more recent than {@code timestamped}
+ * or {@code timestamped is null}
+ */
+ private static boolean isMoreRecent(Timestamp timestamp, Timestamped<?> timestamped) {
+ checkNotNull(timestamp);
+ if (timestamped == null) {
+ return true;
+ }
+ return timestamp.compareTo(timestamped.timestamp()) > 0;
+ }
+
+ private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
+ Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions
+ = getOrCreateLinkDescriptions(key);
+
+ synchronized (linkDescriptions) {
+ if (linkDescriptions.isEmpty()) {
+ // never seen such link before. keeping timestamp for record
+ removedLinks.put(key, timestamp);
+ return null;
+ }
// accept removal request if given timestamp is newer than
// the latest Timestamp from Primary provider
- ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
- if (linkDescriptions.get(primaryProviderId).isNewer(timestamp)) {
+ Timestamped<LinkDescription> prim = getPrimaryDescription(linkDescriptions);
+ if (!isMoreRecent(timestamp, prim)) {
+ // outdated remove request, ignore
return null;
}
removedLinks.put(key, timestamp);
@@ -406,12 +446,13 @@
/**
* @return primary ProviderID, or randomly chosen one if none exists
*/
- private ProviderId pickPrimaryProviderId(
+ private static ProviderId pickBaseProviderId(
Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
if (!e.getKey().isAncillary()) {
+ // found primary
return e.getKey();
} else if (fallBackPrimary == null) {
// pick randomly as a fallback in case there is no primary
@@ -421,9 +462,10 @@
return fallBackPrimary;
}
+ // Guarded by linkDescs value (=locking each Link)
private Link composeLink(Map<ProviderId, Timestamped<LinkDescription>> descs) {
- ProviderId primaryProviderId = pickPrimaryProviderId(descs);
- Timestamped<LinkDescription> base = descs.get(primaryProviderId);
+ ProviderId baseProviderId = pickBaseProviderId(descs);
+ Timestamped<LinkDescription> base = descs.get(baseProviderId);
ConnectPoint src = base.value().src();
ConnectPoint dst = base.value().dst();
@@ -432,7 +474,7 @@
annotations = merge(annotations, base.value().annotations());
for (Entry<ProviderId, Timestamped<LinkDescription>> e : descs.entrySet()) {
- if (primaryProviderId.equals(e.getKey())) {
+ if (baseProviderId.equals(e.getKey())) {
continue;
}
@@ -445,10 +487,10 @@
annotations = merge(annotations, e.getValue().value().annotations());
}
- return new DefaultLink(primaryProviderId , src, dst, type, annotations);
+ return new DefaultLink(baseProviderId, src, dst, type, annotations);
}
- private Map<ProviderId, Timestamped<LinkDescription>> getLinkDescriptions(LinkKey key) {
+ private Map<ProviderId, Timestamped<LinkDescription>> getOrCreateLinkDescriptions(LinkKey key) {
Map<ProviderId, Timestamped<LinkDescription>> r;
r = linkDescs.get(key);
if (r != null) {
@@ -464,11 +506,11 @@
}
}
- private Timestamped<LinkDescription> getLinkDescription(LinkKey key, ProviderId providerId) {
- return getLinkDescriptions(key).get(providerId);
- }
-
private final Function<LinkKey, Link> lookupLink = new LookupLink();
+ /**
+ * Returns a Function to lookup Link instance using LinkKey from cache.
+ * @return
+ */
private Function<LinkKey, Link> lookupLink() {
return lookupLink;
}
@@ -480,26 +522,12 @@
}
}
- private static final class IsPrimary implements Predicate<Provided> {
-
- private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
- public static final Predicate<Provided> isPrimary() {
- return IS_PRIMARY;
- }
-
- @Override
- public boolean apply(Provided input) {
- return !input.providerId().isAncillary();
- }
- }
-
private void notifyDelegateIfNotNull(LinkEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
- // TODO: should we be throwing exception?
private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
@@ -508,17 +536,12 @@
clusterCommunicator.broadcast(message);
}
- // TODO: should we be throwing exception?
- private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) {
- try {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, recipient);
- } catch (IOException e) {
- log.error("Failed to send a {} message to {}", subject.value(), recipient);
- }
+ private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ SERIALIZER.encode(event));
+ clusterCommunicator.unicast(message, recipient);
}
private void notifyPeers(InternalLinkEvent event) throws IOException {
@@ -529,12 +552,22 @@
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
}
+ // notify peer, silently ignoring error
private void notifyPeer(NodeId peer, InternalLinkEvent event) {
- unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
+ try {
+ unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
+ } catch (IOException e) {
+ log.debug("Failed to notify peer {} with message {}", peer, event);
+ }
}
+ // notify peer, silently ignoring error
private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
- unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
+ try {
+ unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
+ } catch (IOException e) {
+ log.debug("Failed to notify peer {} with message {}", peer, event);
+ }
}
private final class SendAdvertisementTask implements Runnable {
@@ -573,9 +606,9 @@
}
try {
- unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
- } catch (Exception e) {
- log.error("Failed to send anti-entropy advertisement", e);
+ unicastMessage(peer, LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
+ } catch (IOException e) {
+ log.debug("Failed to send anti-entropy advertisement to {}", peer);
return;
}
} catch (Exception e) {
@@ -608,42 +641,75 @@
return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
}
- private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement advertisement) {
+ private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement ad) {
- NodeId peer = advertisement.sender();
+ final NodeId sender = ad.sender();
+ boolean localOutdated = false;
- Map<LinkFragmentId, Timestamp> linkTimestamps = advertisement.linkTimestamps();
- Map<LinkKey, Timestamp> linkTombstones = advertisement.linkTombstones();
- for (Map.Entry<LinkFragmentId, Timestamp> entry : linkTimestamps.entrySet()) {
- LinkFragmentId linkFragmentId = entry.getKey();
- Timestamp peerTimestamp = entry.getValue();
+ for (Entry<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>>
+ l : linkDescs.entrySet()) {
- LinkKey key = linkFragmentId.linkKey();
- ProviderId providerId = linkFragmentId.providerId();
+ final LinkKey key = l.getKey();
+ final Map<ProviderId, Timestamped<LinkDescription>> link = l.getValue();
+ synchronized (link) {
+ Timestamp localLatest = removedLinks.get(key);
- Timestamped<LinkDescription> linkDescription = getLinkDescription(key, providerId);
- if (linkDescription.isNewer(peerTimestamp)) {
- // I have more recent link description. update peer.
- notifyPeer(peer, new InternalLinkEvent(providerId, linkDescription));
- }
- // else TODO: Peer has more recent link description. request it.
+ for (Entry<ProviderId, Timestamped<LinkDescription>> p : link.entrySet()) {
+ final ProviderId providerId = p.getKey();
+ final Timestamped<LinkDescription> pDesc = p.getValue();
- Timestamp linkRemovedTimestamp = removedLinks.get(key);
- if (linkRemovedTimestamp != null && linkRemovedTimestamp.compareTo(peerTimestamp) > 0) {
- // peer has a zombie link. update peer.
- notifyPeer(peer, new InternalLinkRemovedEvent(key, linkRemovedTimestamp));
+ final LinkFragmentId fragId = new LinkFragmentId(key, providerId);
+ // remote
+ Timestamp remoteTimestamp = ad.linkTimestamps().get(fragId);
+ if (remoteTimestamp == null) {
+ remoteTimestamp = ad.linkTombstones().get(key);
+ }
+ if (remoteTimestamp == null ||
+ pDesc.isNewer(remoteTimestamp)) {
+ // I have more recent link description. update peer.
+ notifyPeer(sender, new InternalLinkEvent(providerId, pDesc));
+ } else {
+ final Timestamp remoteLive = ad.linkTimestamps().get(fragId);
+ if (remoteLive != null &&
+ remoteLive.compareTo(pDesc.timestamp()) > 0) {
+ // I have something outdated
+ localOutdated = true;
+ }
+ }
+
+ // search local latest along the way
+ if (localLatest == null ||
+ pDesc.isNewer(localLatest)) {
+ localLatest = pDesc.timestamp();
+ }
+ }
+ // Tests if remote remove is more recent then local latest.
+ final Timestamp remoteRemove = ad.linkTombstones().get(key);
+ if (remoteRemove != null) {
+ if (localLatest != null &&
+ localLatest.compareTo(remoteRemove) < 0) {
+ // remote remove is more recent
+ notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
+ }
+ }
}
}
- for (Map.Entry<LinkKey, Timestamp> entry : linkTombstones.entrySet()) {
- LinkKey key = entry.getKey();
- Timestamp peerTimestamp = entry.getValue();
+ // populate remove info if not known locally
+ for (Entry<LinkKey, Timestamp> remoteRm : ad.linkTombstones().entrySet()) {
+ final LinkKey key = remoteRm.getKey();
+ final Timestamp remoteRemove = remoteRm.getValue();
+ // relying on removeLinkInternal to ignore stale info
+ notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
+ }
- ProviderId primaryProviderId = pickPrimaryProviderId(getLinkDescriptions(key));
- if (primaryProviderId != null) {
- if (!getLinkDescription(key, primaryProviderId).isNewer(peerTimestamp)) {
- notifyDelegateIfNotNull(removeLinkInternal(key, peerTimestamp));
- }
+ if (localOutdated) {
+ // send back advertisement to speed up convergence
+ try {
+ unicastMessage(sender, LINK_ANTI_ENTROPY_ADVERTISEMENT,
+ createAdvertisement());
+ } catch (IOException e) {
+ log.debug("Failed to send back active advertisement");
}
}
}