GossipHostStore: add AE support
- modified HostDescription family to hold Set of IpAddresses
Change-Id: Id920fdc83817802885e8528af185a5ad590bf999
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index 39bc770..4025d0c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -1,10 +1,13 @@
package org.onlab.onos.store.host.impl;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -12,6 +15,8 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultHost;
@@ -19,6 +24,7 @@
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.HostLocation;
+import org.onlab.onos.net.host.DefaultHostDescription;
import org.onlab.onos.net.host.HostClockService;
import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.host.HostEvent;
@@ -42,12 +48,19 @@
import org.slf4j.Logger;
import java.io.IOException;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onlab.onos.net.host.HostEvent.Type.*;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
//TODO: multi-provider, annotation not supported.
@@ -88,24 +101,58 @@
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(DistributedStoreSerializers.COMMON)
+ .register(InternalHostEvent.class)
.register(InternalHostRemovedEvent.class)
+ .register(HostFragmentId.class)
+ .register(HostAntiEntropyAdvertisement.class)
.build()
.populate(1);
}
};
+ private ScheduledExecutorService executor;
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
- GossipHostStoreMessageSubjects.HOST_UPDATED, new InternalHostEventListener());
+ GossipHostStoreMessageSubjects.HOST_UPDATED,
+ new InternalHostEventListener());
clusterCommunicator.addSubscriber(
- GossipHostStoreMessageSubjects.HOST_REMOVED, new InternalHostRemovedEventListener());
+ GossipHostStoreMessageSubjects.HOST_REMOVED,
+ new InternalHostRemovedEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
+ new InternalHostAntiEntropyAdvertisementListener());
+
+ executor =
+ newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
+
+ // TODO: Make these configurable
+ long initialDelaySec = 5;
+ long periodSec = 5;
+ // start anti-entropy thread
+ executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ executor.shutdownNow();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ log.error("Timeout during executor shutdown");
+ }
+ } catch (InterruptedException e) {
+ log.error("Error during executor shutdown", e);
+ }
+
+ hosts.clear();
+ removedHosts.clear();
+ locations.clear();
+ portAddresses.clear();
+
log.info("Stopped");
}
@@ -153,7 +200,7 @@
descr.hwAddress(),
descr.vlan(),
new Timestamped<>(descr.location(), timestamp),
- ImmutableSet.of(descr.ipAddress()));
+ ImmutableSet.copyOf(descr.ipAddress()));
hosts.put(hostId, newhost);
locations.put(descr.location(), newhost);
return new HostEvent(HOST_ADDED, newhost);
@@ -169,12 +216,12 @@
return new HostEvent(HOST_MOVED, host);
}
- if (host.ipAddresses().contains(descr.ipAddress())) {
+ if (host.ipAddresses().containsAll(descr.ipAddress())) {
return null;
}
Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
- addresses.add(descr.ipAddress());
+ addresses.addAll(descr.ipAddress());
StoredHost updated = new StoredHost(providerId, host.id(),
host.mac(), host.vlan(),
host.location, addresses);
@@ -381,6 +428,10 @@
public HostLocation location() {
return location.value();
}
+
+ public Timestamp timestamp() {
+ return location.timestamp();
+ }
}
private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
@@ -399,6 +450,16 @@
clusterCommunicator.broadcast(message);
}
+ private void unicastMessage(NodeId peer,
+ MessageSubject subject,
+ Object event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ SERIALIZER.encode(event));
+ clusterCommunicator.unicast(message, peer);
+ }
+
private void notifyDelegateIfNotNull(HostEvent event) {
if (event != null) {
notifyDelegate(event);
@@ -434,4 +495,165 @@
notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
}
}
+
+ private final class SendAdvertisementTask implements Runnable {
+
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ final NodeId self = clusterService.getLocalNode().id();
+ Set<ControllerNode> nodes = clusterService.getNodes();
+
+ ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
+ .transform(toNodeId())
+ .toList();
+
+ if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+ log.debug("No other peers in the cluster.");
+ return;
+ }
+
+ NodeId peer;
+ do {
+ int idx = RandomUtils.nextInt(0, nodeIds.size());
+ peer = nodeIds.get(idx);
+ } while (peer.equals(self));
+
+ HostAntiEntropyAdvertisement ad = createAdvertisement();
+
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
+ } catch (IOException e) {
+ log.debug("Failed to send anti-entropy advertisement", e);
+ return;
+ }
+ } catch (Exception e) {
+ // catch all Exception to avoid Scheduled task being suppressed.
+ log.error("Exception thrown while sending advertisement", e);
+ }
+ }
+ }
+
+ private HostAntiEntropyAdvertisement createAdvertisement() {
+ final NodeId self = clusterService.getLocalNode().id();
+
+ Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
+ Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
+
+ for (Entry<HostId, StoredHost> e : hosts.entrySet()) {
+
+ final HostId hostId = e.getKey();
+ final StoredHost hostInfo = e.getValue();
+ final ProviderId providerId = hostInfo.providerId();
+ timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
+ }
+
+ for (Entry<HostId, Timestamped<Host>> e : removedHosts.entrySet()) {
+ tombstones.put(e.getKey(), e.getValue().timestamp());
+ }
+
+ return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
+ }
+
+ private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
+
+ final NodeId sender = ad.sender();
+
+ for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
+ // for each locally live Hosts...
+ final HostId hostId = host.getKey();
+ final StoredHost localHost = host.getValue();
+ final ProviderId providerId = localHost.providerId();
+ final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
+ final Timestamp localLiveTimestamp = localHost.timestamp();
+
+ Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
+ if (remoteTimestamp == null) {
+ remoteTimestamp = ad.tombstones().get(hostId);
+ }
+ if (remoteTimestamp == null ||
+ localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
+
+ // local is more recent, push
+ // TODO: annotation is lost
+ final HostDescription desc = new DefaultHostDescription(
+ localHost.mac(),
+ localHost.vlan(),
+ localHost.location(),
+ localHost.ipAddresses());
+ try {
+ unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
+ new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
+ } catch (IOException e1) {
+ log.debug("Failed to send advertisement response", e1);
+ }
+ }
+
+ final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
+ if (remoteDeadTimestamp != null &&
+ remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
+ // sender has recent remove
+ notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
+ }
+ }
+
+ for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
+ // for each locally dead Hosts
+ final HostId hostId = dead.getKey();
+ final Timestamp localDeadTimestamp = dead.getValue().timestamp();
+
+ // TODO: pick proper ProviderId, when supporting multi-provider
+ final ProviderId providerId = dead.getValue().value().providerId();
+ final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
+
+ final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
+ if (remoteLiveTimestamp != null &&
+ localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
+ // sender has zombie, push
+ try {
+ unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
+ new InternalHostRemovedEvent(hostId, localDeadTimestamp));
+ } catch (IOException e1) {
+ log.debug("Failed to send advertisement response", e1);
+ }
+ }
+ }
+
+
+ for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
+ // for each remote tombstone advertisement...
+ final HostId hostId = e.getKey();
+ final Timestamp adRemoveTimestamp = e.getValue();
+
+ final StoredHost storedHost = hosts.get(hostId);
+ if (storedHost == null) {
+ continue;
+ }
+ if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
+ // sender has recent remove info, locally remove
+ notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
+ }
+ }
+ }
+
+ private final class InternalHostAntiEntropyAdvertisementListener implements
+ ClusterMessageHandler {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ log.debug("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
+ HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
+ handleAntiEntropyAdvertisement(advertisement);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
index 27cf4ce..0a9f0e0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
@@ -4,6 +4,11 @@
public final class GossipHostStoreMessageSubjects {
private GossipHostStoreMessageSubjects() {}
- public static final MessageSubject HOST_UPDATED = new MessageSubject("peer-host-updated");
- public static final MessageSubject HOST_REMOVED = new MessageSubject("peer-host-removed");
+
+ public static final MessageSubject HOST_UPDATED
+ = new MessageSubject("peer-host-updated");
+ public static final MessageSubject HOST_REMOVED
+ = new MessageSubject("peer-host-removed");
+ public static final MessageSubject HOST_ANTI_ENTROPY_ADVERTISEMENT
+ = new MessageSubject("host-enti-entropy-advertisement");;
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostAntiEntropyAdvertisement.java
new file mode 100644
index 0000000..6139005
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostAntiEntropyAdvertisement.java
@@ -0,0 +1,48 @@
+package org.onlab.onos.store.host.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Host AE Advertisement message.
+ */
+public final class HostAntiEntropyAdvertisement {
+
+ private final NodeId sender;
+ private final Map<HostFragmentId, Timestamp> timestamps;
+ private final Map<HostId, Timestamp> tombstones;
+
+
+ public HostAntiEntropyAdvertisement(NodeId sender,
+ Map<HostFragmentId, Timestamp> timestamps,
+ Map<HostId, Timestamp> tombstones) {
+ this.sender = checkNotNull(sender);
+ this.timestamps = checkNotNull(timestamps);
+ this.tombstones = checkNotNull(tombstones);
+ }
+
+ public NodeId sender() {
+ return sender;
+ }
+
+ public Map<HostFragmentId, Timestamp> timestamps() {
+ return timestamps;
+ }
+
+ public Map<HostId, Timestamp> tombstones() {
+ return tombstones;
+ }
+
+ // For serializer
+ @SuppressWarnings("unused")
+ private HostAntiEntropyAdvertisement() {
+ this.sender = null;
+ this.timestamps = null;
+ this.tombstones = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostFragmentId.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostFragmentId.java
new file mode 100644
index 0000000..34dbff6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostFragmentId.java
@@ -0,0 +1,62 @@
+package org.onlab.onos.store.host.impl;
+
+import java.util.Objects;
+
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Identifier for HostDescription from a Provider.
+ */
+public final class HostFragmentId {
+ public final ProviderId providerId;
+ public final HostId hostId;
+
+ public HostFragmentId(HostId hostId, ProviderId providerId) {
+ this.providerId = providerId;
+ this.hostId = hostId;
+ }
+
+ public HostId hostId() {
+ return hostId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(providerId, hostId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof HostFragmentId)) {
+ return false;
+ }
+ HostFragmentId that = (HostFragmentId) obj;
+ return Objects.equals(this.hostId, that.hostId) &&
+ Objects.equals(this.providerId, that.providerId);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("hostId", hostId)
+ .toString();
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private HostFragmentId() {
+ this.providerId = null;
+ this.hostId = null;
+ }
+}