Implement anti-entropy for the EventuallyConsistentMap.
ONOS-857.
Change-Id: Ife2070142d3c165c2a0035c3011c05b426c8baa4
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onosproject/store/impl/AntiEntropyAdvertisement.java
new file mode 100644
index 0000000..abba052
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/AntiEntropyAdvertisement.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.impl;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.Timestamp;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Anti-entropy advertisement message for eventually consistent map.
+ */
+public class AntiEntropyAdvertisement<K> {
+
+ private final NodeId sender;
+ private final Map<K, Timestamp> timestamps;
+ private final Map<K, Timestamp> tombstones;
+
+ /**
+ * Creates a new anti entropy advertisement message.
+ *
+ * @param sender the sender's node ID
+ * @param timestamps map of item key to timestamp for current items
+ * @param tombstones map of item key to timestamp for removed items
+ */
+ public AntiEntropyAdvertisement(NodeId sender,
+ Map<K, Timestamp> timestamps,
+ Map<K, Timestamp> tombstones) {
+ this.sender = checkNotNull(sender);
+ this.timestamps = checkNotNull(timestamps);
+ this.tombstones = checkNotNull(tombstones);
+ }
+
+ /**
+ * Returns the sender's node ID.
+ *
+ * @return the sender's node ID
+ */
+ public NodeId sender() {
+ return sender;
+ }
+
+ /**
+ * Returns the map of current item timestamps.
+ *
+ * @return current item timestamps
+ */
+ public Map<K, Timestamp> timestamps() {
+ return timestamps;
+ }
+
+ /**
+ * Returns the map of removed item timestamps.
+ *
+ * @return removed item timestamps
+ */
+ public Map<K, Timestamp> tombstones() {
+ return tombstones;
+ }
+
+ // For serializer
+ @SuppressWarnings("unused")
+ private AntiEntropyAdvertisement() {
+ this.sender = null;
+ this.timestamps = null;
+ this.tombstones = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
index 0acef1a..00b2004 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
@@ -16,8 +16,10 @@
package org.onosproject.store.impl;
import com.google.common.base.MoreObjects;
+import org.apache.commons.lang3.RandomUtils;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -32,6 +34,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,6 +44,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -69,8 +74,9 @@
private final MessageSubject updateMessageSubject;
private final MessageSubject removeMessageSubject;
+ private final MessageSubject antiEntropyAdvertisementSubject;
- private final Set<EventuallyConsistentMapListener> listeners
+ private final Set<EventuallyConsistentMapListener<K, V>> listeners
= new CopyOnWriteArraySet<>();
private final ExecutorService executor;
@@ -138,12 +144,20 @@
newSingleThreadScheduledExecutor(minPriority(
namedThreads("onos-ecm-" + mapName + "-bg-%d")));
+ // start anti-entropy thread
+ backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, periodSec,
+ TimeUnit.SECONDS);
+
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
new InternalPutEventListener());
removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
clusterCommunicator.addSubscriber(removeMessageSubject,
new InternalRemoveEventListener());
+ antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
+ clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
+ new InternalAntiEntropyListener());
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -158,9 +172,9 @@
.register(ArrayList.class)
.register(InternalPutEvent.class)
.register(InternalRemoveEvent.class)
+ .register(AntiEntropyAdvertisement.class)
+ .register(HashMap.class)
.build();
-
- // TODO anti-entropy classes
}
};
}
@@ -360,8 +374,8 @@
clusterCommunicator.removeSubscriber(removeMessageSubject);
}
- private void notifyListeners(EventuallyConsistentMapEvent event) {
- for (EventuallyConsistentMapListener listener : listeners) {
+ private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
+ for (EventuallyConsistentMapListener<K, V> listener : listeners) {
listener.event(event);
}
}
@@ -418,6 +432,245 @@
}
}
+ private final class SendAdvertisementTask implements Runnable {
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ final NodeId self = clusterService.getLocalNode().id();
+ Set<ControllerNode> nodes = clusterService.getNodes();
+
+ List<NodeId> nodeIds = nodes.stream()
+ .map(node -> node.id())
+ .collect(Collectors.toList());
+
+ if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+ log.trace("No other peers in the cluster.");
+ return;
+ }
+
+ NodeId peer;
+ do {
+ int idx = RandomUtils.nextInt(0, nodeIds.size());
+ peer = nodeIds.get(idx);
+ } while (peer.equals(self));
+
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ AntiEntropyAdvertisement<K> ad = createAdvertisement();
+
+ try {
+ unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
+ } catch (IOException e) {
+ log.debug("Failed to send anti-entropy advertisement to {}", peer);
+ }
+ } catch (Exception e) {
+ // Catch all exceptions to avoid scheduled task being suppressed.
+ log.error("Exception thrown while sending advertisement", e);
+ }
+ }
+ }
+
+ private AntiEntropyAdvertisement<K> createAdvertisement() {
+ final NodeId self = clusterService.getLocalNode().id();
+
+ Map<K, Timestamp> timestamps = new HashMap<>(items.size());
+
+ items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
+
+ Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
+
+ return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
+ }
+
+ private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
+ List<EventuallyConsistentMapEvent<K, V>> externalEvents;
+
+ synchronized (this) {
+ final NodeId sender = ad.sender();
+
+ externalEvents = antiEntropyCheckLocalItems(ad);
+
+ antiEntropyCheckLocalRemoved(ad);
+
+ externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
+
+ // if remote ad has something unknown, actively sync
+ for (K key : ad.timestamps().keySet()) {
+ if (!items.containsKey(key)) {
+ AntiEntropyAdvertisement<K> myAd = createAdvertisement();
+ try {
+ unicastMessage(sender, antiEntropyAdvertisementSubject,
+ myAd);
+ break;
+ } catch (IOException e) {
+ log.debug(
+ "Failed to send reactive anti-entropy advertisement to {}",
+ sender);
+ }
+ }
+ }
+ } // synchronized (this)
+
+ externalEvents.forEach(this::notifyListeners);
+ }
+
+ /**
+ * Checks if any of the remote's live items or tombstones are out of date
+ * according to our local live item list, or if our live items are out of
+ * date according to the remote's tombstone list.
+ * If the local copy is more recent, it will be pushed to the remote. If the
+ * remote has a more recent remove, we apply that to the local state.
+ *
+ * @param ad remote anti-entropy advertisement
+ * @return list of external events relating to local operations performed
+ */
+ // Guarded by synchronized (this)
+ private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
+ AntiEntropyAdvertisement<K> ad) {
+ final List<EventuallyConsistentMapEvent<K, V>> externalEvents
+ = new LinkedList<>();
+ final NodeId sender = ad.sender();
+
+ final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
+
+ for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
+ K key = item.getKey();
+ Timestamped<V> localValue = item.getValue();
+
+ Timestamp remoteTimestamp = ad.timestamps().get(key);
+ if (remoteTimestamp == null) {
+ remoteTimestamp = ad.tombstones().get(key);
+ }
+ if (remoteTimestamp == null || localValue
+ .isNewer(remoteTimestamp)) {
+ // local value is more recent, push to sender
+ updatesToSend
+ .add(new PutEntry<>(key, localValue.value(),
+ localValue.timestamp()));
+ }
+
+ Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
+ if (remoteDeadTimestamp != null &&
+ remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
+ // sender has a more recent remove
+ if (removeInternal(key, remoteDeadTimestamp)) {
+ externalEvents.add(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, key, null));
+ }
+ }
+ }
+
+ // Send all updates to the peer at once
+ if (!updatesToSend.isEmpty()) {
+ try {
+ unicastMessage(sender, updateMessageSubject, new InternalPutEvent<>(updatesToSend));
+ } catch (IOException e) {
+ log.warn("Failed to send advertisement response", e);
+ }
+ }
+
+ return externalEvents;
+ }
+
+ /**
+ * Checks if any items in the remote live list are out of date according
+ * to our tombstone list. If we find we have a more up to date tombstone,
+ * we'll send it to the remote.
+ *
+ * @param ad remote anti-entropy advertisement
+ */
+ // Guarded by synchronized (this)
+ private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
+ final NodeId sender = ad.sender();
+
+ final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
+
+ for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
+ K key = dead.getKey();
+ Timestamp localDeadTimestamp = dead.getValue();
+
+ Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
+ if (remoteLiveTimestamp != null
+ && localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
+ // sender has zombie, push remove
+ removesToSend
+ .add(new RemoveEntry<>(key, localDeadTimestamp));
+ }
+ }
+
+ // Send all removes to the peer at once
+ if (!removesToSend.isEmpty()) {
+ try {
+ unicastMessage(sender, removeMessageSubject, new InternalRemoveEvent<>(removesToSend));
+ } catch (IOException e) {
+ log.warn("Failed to send advertisement response", e);
+ }
+ }
+ }
+
+ /**
+ * Checks if any of the local live items are out of date according to the
+ * remote's tombstone advertisements. If we find a local item is out of date,
+ * we'll apply the remove operation to the local state.
+ *
+ * @param ad remote anti-entropy advertisement
+ * @return list of external events relating to local operations performed
+ */
+ // Guarded by synchronized (this)
+ private List<EventuallyConsistentMapEvent<K, V>>
+ antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
+ final List<EventuallyConsistentMapEvent<K, V>> externalEvents
+ = new LinkedList<>();
+
+ for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
+ K key = remoteDead.getKey();
+ Timestamp remoteDeadTimestamp = remoteDead.getValue();
+
+ Timestamped<V> local = items.get(key);
+ Timestamp localDead = removedItems.get(key);
+ if (local != null
+ && remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
+ // remove our version
+ if (removeInternal(key, remoteDeadTimestamp)) {
+ externalEvents.add(new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, key, null));
+ }
+ } else if (localDead != null &&
+ remoteDeadTimestamp.compareTo(localDead) > 0) {
+ // If we both had the item as removed, but their timestamp is
+ // newer, update ours to the newer value
+ removedItems.put(key, remoteDeadTimestamp);
+ }
+ }
+
+ return externalEvents;
+ }
+
+ private final class InternalAntiEntropyListener
+ implements ClusterMessageHandler {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ log.trace("Received anti-entropy advertisement from peer: {}", message.sender());
+ AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
+ backgroundExecutor.submit(() -> {
+ try {
+ handleAntiEntropyAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling advertisements", e);
+ }
+ });
+ }
+ }
+
private final class InternalPutEventListener implements
ClusterMessageHandler {
@Override
@@ -433,7 +686,7 @@
Timestamp timestamp = entry.timestamp();
if (putInternal(key, value, timestamp)) {
- EventuallyConsistentMapEvent externalEvent =
+ EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, key,
value);
@@ -461,7 +714,8 @@
Timestamp timestamp = entry.timestamp();
if (removeInternal(key, timestamp)) {
- EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
+ EventuallyConsistentMapEvent<K, V> externalEvent
+ = new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE,
key, null);
notifyListeners(externalEvent);