[ONOS-3591] Anti-Entropy speed up via push/pull interaction
Adds an UpdateRequest message. This contains a set of keys that a node
is missing updates for. The receiver will then send an UpdateEntry for
each missing key to the requester.
Change-Id: I2115f4a05833b51ae14d1191f09f083b5251f8ec
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index b4174b6..2e1e29a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -90,6 +91,7 @@
private final MessageSubject updateMessageSubject;
private final MessageSubject antiEntropyAdvertisementSubject;
+ private final MessageSubject updateRequestSubject;
private final Set<EventuallyConsistentMapListener<K, V>> listeners
= Sets.newCopyOnWriteArraySet();
@@ -244,6 +246,12 @@
serializer::encode,
this.backgroundExecutor);
+ updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
+ clusterCommunicator.addSubscriber(updateRequestSubject,
+ serializer::decode,
+ this::handleUpdateRequests,
+ this.backgroundExecutor);
+
if (!tombstonesDisabled) {
previousTombstonePurgeTime = 0;
this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
@@ -513,6 +521,7 @@
listeners.clear();
clusterCommunicator.removeSubscriber(updateMessageSubject);
+ clusterCommunicator.removeSubscriber(updateRequestSubject);
clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
return CompletableFuture.completedFuture(null);
}
@@ -579,6 +588,19 @@
});
}
+ private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
+ UpdateRequest<K> request = new UpdateRequest<>(localNodeId, keys);
+ clusterCommunicator.unicast(request,
+ updateRequestSubject,
+ serializer::encode,
+ peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send update request to {}", peer, error);
+ }
+ });
+ }
+
private AntiEntropyAdvertisement<K> createAdvertisement() {
return new AntiEntropyAdvertisement<>(localNodeId,
ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
@@ -591,18 +613,9 @@
try {
if (log.isTraceEnabled()) {
log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
- mapName, ad.sender(), ad.digest().size());
+ ad.sender(), mapName, ad.digest().size());
}
antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
-
- if (!lightweightAntiEntropy) {
- // if remote ad has any entries that the local copy is missing, actively sync
- // TODO: Missing keys is not the way local copy can be behind.
- if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
- // TODO: Send ad for missing keys and for entries that are stale
- sendAdvertisementToPeer(ad.sender());
- }
- }
} catch (Exception e) {
log.warn("Error handling anti-entropy advertisement", e);
return AntiEntropyResponse.FAILED;
@@ -620,15 +633,20 @@
AntiEntropyAdvertisement<K> ad) {
final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
final NodeId sender = ad.sender();
+ final List<NodeId> peers = ImmutableList.of(sender);
+ Set<K> staleOrMissing = new HashSet<>();
+ Set<K> locallyUnknown = new HashSet<>(ad.digest().keySet());
+
items.forEach((key, localValue) -> {
+ locallyUnknown.remove(key);
MapValue.Digest remoteValueDigest = ad.digest().get(key);
if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
// local value is more recent, push to sender
- queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
- }
- if (remoteValueDigest != null
+ queueUpdate(new UpdateEntry<>(key, localValue), peers);
+ } else if (remoteValueDigest != null
&& remoteValueDigest.isNewerThan(localValue.digest())
&& remoteValueDigest.isTombstone()) {
+ // remote value is more recent and a tombstone: update local value
MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
MapValue<V> previousValue = removeInternal(key,
Optional.empty(),
@@ -636,14 +654,31 @@
if (previousValue != null && previousValue.isAlive()) {
externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
+ } else if (remoteValueDigest.isNewerThan(localValue.digest())) {
+ // Not a tombstone and remote is newer
+ staleOrMissing.add(key);
}
});
+ // Keys missing in local map
+ staleOrMissing.addAll(locallyUnknown);
+ // Request updates that we missed out on
+ sendUpdateRequestToPeer(sender, staleOrMissing);
return externalEvents;
}
+ private void handleUpdateRequests(UpdateRequest<K> request) {
+ final Set<K> keys = request.keys();
+ final NodeId sender = request.sender();
+ final List<NodeId> peers = ImmutableList.of(sender);
+
+ keys.forEach(key ->
+ queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
+ );
+ }
+
private void purgeTombstones() {
/*
- * In order to mitigate the resource exhausation that can ensue due to an ever-growing set
+ * In order to mitigate the resource exhaustion that can ensue due to an ever-growing set
* of tombstones we employ the following heuristic to purge old tombstones periodically.
* First, we keep track of the time (local system time) when we were able to have a successful
* AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateRequest.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateRequest.java
new file mode 100644
index 0000000..e6b62d7
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.cluster.NodeId;
+
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Describes a request for update events in an EventuallyConsistentMap.
+ */
+final class UpdateRequest<K> {
+
+ private final NodeId sender;
+ private final Set<K> keys;
+
+ /**
+ * Creates a new update request.
+ *
+ * @param sender the sender's node ID
+ * @param keys keys requested
+ */
+ public UpdateRequest(NodeId sender, Set<K> keys) {
+ this.sender = checkNotNull(sender);
+ this.keys = ImmutableSet.copyOf(keys);
+ }
+
+ /**
+ * Returns the sender's node ID.
+ *
+ * @return the sender's node ID
+ */
+ public NodeId sender() {
+ return sender;
+ }
+
+ /**
+ * Returns the keys.
+ *
+ * @return the keys
+ */
+ public Set<K> keys() {
+ return keys;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("sender", sender)
+ .add("keys", keys())
+ .toString();
+ }
+
+ @SuppressWarnings("unused")
+ private UpdateRequest() {
+ this.sender = null;
+ this.keys = null;
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 0012d68..8d41fd2 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -88,6 +88,8 @@
= new MessageSubject("ecm-" + MAP_NAME + "-update");
private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
+ private static final MessageSubject UPDATE_REQUEST_SUBJECT
+ = new MessageSubject("ecm-" + MAP_NAME + "-update-request");
private static final String KEY1 = "one";
private static final String KEY2 = "two";
@@ -98,6 +100,7 @@
new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
+ private Consumer<Collection<UpdateRequest<String>>> requestHandler;
private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
@Before
@@ -123,6 +126,9 @@
anyObject(Function.class),
anyObject(Executor.class));
expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
+ clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
+ anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
replay(clusterCommunicator);
@@ -627,6 +633,7 @@
@Test
public void testDestroy() throws Exception {
clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
+ clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
replay(clusterCommunicator);
@@ -774,6 +781,8 @@
Executor executor) {
if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
+ } else if (subject.equals(UPDATE_REQUEST_SUBJECT)) {
+ requestHandler = (Consumer<Collection<UpdateRequest<String>>>) handler;
} else {
throw new RuntimeException("Unexpected message subject " + subject.toString());
}