GossipHostStore: allow location change + update
- Actively sync with peer on anti-entropy message
to improve convergence speed
- Timestamp not only location
- Refresh timestamp on delta update
Might fix ONOS-436
Change-Id: I271f9af04b87d78124d055e79b93413deaf1fa3c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
index 10bf9ec..821cdc5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
@@ -15,13 +15,16 @@
*/
package org.onosproject.store.host.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
+import static com.google.common.collect.Multimaps.newSetMultimap;
+import static com.google.common.collect.Sets.newConcurrentHashSet;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
+import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.*;
import static org.onlab.util.Tools.namedThreads;
import static org.onlab.util.Tools.minPriority;
import static org.slf4j.LoggerFactory.getLogger;
@@ -50,6 +53,7 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.Annotations;
+import org.onosproject.net.AnnotationsUtil;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultHost;
@@ -61,6 +65,7 @@
import org.onosproject.net.host.HostClockService;
import org.onosproject.net.host.HostDescription;
import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.host.HostEvent.Type;
import org.onosproject.net.host.HostStore;
import org.onosproject.net.host.HostStoreDelegate;
import org.onosproject.net.host.PortAddresses;
@@ -109,7 +114,9 @@
private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
// Hosts tracked by their location
- private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
+ private final Multimap<ConnectPoint, Host> locations
+ = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
+ () -> newConcurrentHashSet()));
private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
Multimaps.synchronizedSetMultimap(
@@ -142,16 +149,20 @@
private ScheduledExecutorService backgroundExecutor;
+ // TODO: Make these anti-entropy params configurable
+ private long initialDelaySec = 5;
+ private long periodSec = 5;
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
- GossipHostStoreMessageSubjects.HOST_UPDATED,
+ HOST_UPDATED_MSG,
new InternalHostEventListener());
clusterCommunicator.addSubscriber(
- GossipHostStoreMessageSubjects.HOST_REMOVED,
+ HOST_REMOVED_MSG,
new InternalHostRemovedEventListener());
clusterCommunicator.addSubscriber(
- GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
+ HOST_ANTI_ENTROPY_ADVERTISEMENT,
new InternalHostAntiEntropyAdvertisementListener());
executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d"));
@@ -159,9 +170,6 @@
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
- // TODO: Make these configurable
- long initialDelaySec = 5;
- long periodSec = 5;
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
@@ -209,66 +217,112 @@
private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
HostDescription hostDescription, Timestamp timestamp) {
+ // If this host was previously removed, first ensure
+ // this new request is "newer"
+ if (isHostRemoved(hostId, timestamp)) {
+ log.debug("Ignoring update for removed host {}@{}",
+ hostDescription, timestamp);
+ return null;
+ }
StoredHost host = hosts.get(hostId);
if (host == null) {
return createHost(providerId, hostId, hostDescription, timestamp);
}
- return updateHost(providerId, host, hostDescription, timestamp);
+ return updateHost(providerId, hostId, host, hostDescription, timestamp);
+ }
+
+ /**
+ * @param hostId host identifier
+ * @param timestamp timstamp to compare with
+ * @return true if given timestamp is more recent timestamp compared to
+ * the timestamp Host was removed.
+ */
+ private boolean isHostRemoved(HostId hostId, Timestamp timestamp) {
+ Timestamped<Host> removedInfo = removedHosts.get(hostId);
+ if (removedInfo != null) {
+ if (removedInfo.isNewer(timestamp)) {
+ return true;
+ }
+ removedHosts.remove(hostId, removedInfo);
+ }
+ return false;
}
// creates a new host and sends HOST_ADDED
private HostEvent createHost(ProviderId providerId, HostId hostId,
HostDescription descr, Timestamp timestamp) {
synchronized (this) {
- // If this host was previously removed, first ensure
- // this new request is "newer"
- if (removedHosts.containsKey(hostId)) {
- if (removedHosts.get(hostId).isNewer(timestamp)) {
- return null;
- } else {
- removedHosts.remove(hostId);
- }
- }
- StoredHost newhost = new StoredHost(providerId, hostId,
+ StoredHost newhost = new StoredHost(timestamp, providerId, hostId,
descr.hwAddress(),
descr.vlan(),
- new Timestamped<>(descr.location(), timestamp),
+ descr.location(),
ImmutableSet.copyOf(descr.ipAddress()));
- hosts.put(hostId, newhost);
+ StoredHost concAdd = hosts.putIfAbsent(hostId, newhost);
+ if (concAdd != null) {
+ // concurrent add detected, retry from start
+ return updateHost(providerId, hostId, concAdd, descr, timestamp);
+ }
locations.put(descr.location(), newhost);
return new HostEvent(HOST_ADDED, newhost);
}
}
// checks for type of update to host, sends appropriate event
- private HostEvent updateHost(ProviderId providerId, StoredHost host,
+ private HostEvent updateHost(ProviderId providerId, HostId hostId, StoredHost oldHost,
HostDescription descr, Timestamp timestamp) {
- HostEvent event;
- if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
- host.setLocation(new Timestamped<>(descr.location(), timestamp));
- return new HostEvent(HOST_MOVED, host);
- }
- if (host.ipAddresses().containsAll(descr.ipAddress()) &&
- descr.annotations().keys().isEmpty()) {
+ if (timestamp.compareTo(oldHost.timestamp()) < 0) {
+ // new timestamp is older
+ log.debug("Ignoring outdated host update {}@{}", descr, timestamp);
return null;
}
- Set<IpAddress> addresses = new HashSet<>(host.ipAddresses());
- addresses.addAll(descr.ipAddress());
- Annotations annotations = merge((DefaultAnnotations) host.annotations(),
- descr.annotations());
- StoredHost updated = new StoredHost(providerId, host.id(),
- host.mac(), host.vlan(),
- host.location, addresses,
- annotations);
- event = new HostEvent(HOST_UPDATED, updated);
- synchronized (this) {
- hosts.put(host.id(), updated);
- locations.remove(host.location(), host);
- locations.put(updated.location(), updated);
+ final boolean hostMoved = !oldHost.location().equals(descr.location());
+ if (hostMoved ||
+ !oldHost.ipAddresses().containsAll(descr.ipAddress()) ||
+ !descr.annotations().keys().isEmpty()) {
+
+ Set<IpAddress> addresses = new HashSet<>(oldHost.ipAddresses());
+ addresses.addAll(descr.ipAddress());
+ Annotations annotations = merge((DefaultAnnotations) oldHost.annotations(),
+ descr.annotations());
+
+ Timestamp newTimestamp = timestamp;
+ // if merged Set/Annotation differ from description...
+ final boolean deltaUpdate = !descr.ipAddress().equals(addresses) ||
+ !AnnotationsUtil.isEqual(descr.annotations(), annotations);
+ if (deltaUpdate) {
+ // ..then local existing info had something description didn't
+ newTimestamp = hostClockService.getTimestamp(hostId);
+ log.debug("delta update detected on {}, substepping timestamp to {}",
+ hostId, newTimestamp);
+ }
+
+ StoredHost updated = new StoredHost(newTimestamp,
+ providerId, oldHost.id(),
+ oldHost.mac(), oldHost.vlan(),
+ descr.location(),
+ addresses,
+ annotations);
+ synchronized (this) {
+ boolean replaced = hosts.replace(hostId, oldHost, updated);
+ if (!replaced) {
+ // concurrent update, retry
+ return createOrUpdateHostInternal(providerId, hostId, descr, timestamp);
+ }
+ locations.remove(oldHost.location(), oldHost);
+ locations.put(updated.location(), updated);
+
+ HostEvent.Type eventType;
+ if (hostMoved) {
+ eventType = Type.HOST_MOVED;
+ } else {
+ eventType = Type.HOST_UPDATED;
+ }
+ return new HostEvent(eventType, updated);
+ }
}
- return event;
+ return null;
}
@Override
@@ -397,9 +451,8 @@
}
}
- // Auxiliary extension to allow location to mutate.
private static final class StoredHost extends DefaultHost {
- private Timestamped<HostLocation> location;
+ private final Timestamp timestamp;
/**
* Creates an end-station host using the supplied information.
@@ -412,33 +465,24 @@
* @param ips host IP addresses
* @param annotations optional key/value annotations
*/
- public StoredHost(ProviderId providerId, HostId id,
- MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
+ public StoredHost(Timestamp timestamp, ProviderId providerId, HostId id,
+ MacAddress mac, VlanId vlan, HostLocation location,
Set<IpAddress> ips, Annotations... annotations) {
- super(providerId, id, mac, vlan, location.value(), ips, annotations);
- this.location = location;
- }
-
- void setLocation(Timestamped<HostLocation> location) {
- this.location = location;
- }
-
- @Override
- public HostLocation location() {
- return location.value();
+ super(providerId, id, mac, vlan, location, ips, annotations);
+ this.timestamp = checkNotNull(timestamp);
}
public Timestamp timestamp() {
- return location.timestamp();
+ return timestamp;
}
}
private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
- broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
+ broadcastMessage(HOST_REMOVED_MSG, event);
}
private void notifyPeers(InternalHostEvent event) throws IOException {
- broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
+ broadcastMessage(HOST_UPDATED_MSG, event);
}
private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
@@ -556,7 +600,7 @@
}
try {
- unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
+ unicastMessage(peer, HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
} catch (IOException e) {
log.debug("Failed to send anti-entropy advertisement to {}", peer);
return;
@@ -613,7 +657,7 @@
localHost.location(),
localHost.ipAddresses());
try {
- unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
+ unicastMessage(sender, HOST_UPDATED_MSG,
new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
} catch (IOException e1) {
log.debug("Failed to send advertisement response", e1);
@@ -642,7 +686,7 @@
localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
// sender has zombie, push
try {
- unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
+ unicastMessage(sender, HOST_REMOVED_MSG,
new InternalHostRemovedEvent(hostId, localDeadTimestamp));
} catch (IOException e1) {
log.debug("Failed to send advertisement response", e1);
@@ -650,7 +694,6 @@
}
}
-
for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
// for each remote tombstone advertisement...
final HostId hostId = e.getKey();
@@ -665,6 +708,19 @@
notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
}
}
+
+ // if remote ad has something unknown, actively sync
+ for (HostFragmentId key : ad.timestamps().keySet()) {
+ if (!hosts.containsKey(key.hostId())) {
+ HostAntiEntropyAdvertisement myAd = createAdvertisement();
+ try {
+ unicastMessage(sender, HOST_ANTI_ENTROPY_ADVERTISEMENT, myAd);
+ break;
+ } catch (IOException e) {
+ log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
+ }
+ }
+ }
}
private final class InternalHostAntiEntropyAdvertisementListener
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java
index 52dc31d..82c0859 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java
@@ -20,9 +20,9 @@
public final class GossipHostStoreMessageSubjects {
private GossipHostStoreMessageSubjects() {}
- public static final MessageSubject HOST_UPDATED
+ public static final MessageSubject HOST_UPDATED_MSG
= new MessageSubject("peer-host-updated");
- public static final MessageSubject HOST_REMOVED
+ public static final MessageSubject HOST_REMOVED_MSG
= new MessageSubject("peer-host-removed");
public static final MessageSubject HOST_ANTI_ENTROPY_ADVERTISEMENT
= new MessageSubject("host-enti-entropy-advertisement");;