[ONOS-6479] Synchronously bootstrap EventuallyConsistentMap
Change-Id: I62a800ee731d1b42265b475c219d9d108adc08eb
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 7964187..41b6afd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -35,6 +35,7 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
@@ -42,6 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -52,13 +54,17 @@
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -88,6 +94,8 @@
private final BiFunction<K, V, Timestamp> timestampProvider;
+ private final MessageSubject bootstrapMessageSubject;
+ private final MessageSubject initializeMessageSubject;
private final MessageSubject updateMessageSubject;
private final MessageSubject antiEntropyAdvertisementSubject;
private final MessageSubject updateRequestSubject;
@@ -229,6 +237,22 @@
initialDelaySec, antiEntropyPeriod,
antiEntropyTimeUnit);
+ bootstrapMessageSubject = new MessageSubject("ecm-" + mapName + "-bootstrap");
+ clusterCommunicator.addSubscriber(bootstrapMessageSubject,
+ serializer::decode,
+ (Function<NodeId, CompletableFuture<Void>>) this::handleBootstrap,
+ serializer::encode);
+
+ initializeMessageSubject = new MessageSubject("ecm-" + mapName + "-initialize");
+ clusterCommunicator.addSubscriber(initializeMessageSubject,
+ serializer::decode,
+ (Function<Collection<UpdateEntry<K, V>>, Void>) u -> {
+ processUpdates(u);
+ return null;
+ },
+ serializer::encode,
+ this.executor);
+
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
serializer::decode,
@@ -505,6 +529,11 @@
checkState(!destroyed, destroyedMessage);
listeners.add(checkNotNull(listener));
+ items.forEach((k, v) -> {
+ if (v.isAlive()) {
+ listener.event(new EventuallyConsistentMapEvent<K, V>(mapName, PUT, k, v.get()));
+ }
+ });
}
@Override
@@ -524,6 +553,8 @@
listeners.clear();
+ clusterCommunicator.removeSubscriber(bootstrapMessageSubject);
+ clusterCommunicator.removeSubscriber(initializeMessageSubject);
clusterCommunicator.removeSubscriber(updateMessageSubject);
clusterCommunicator.removeSubscriber(updateRequestSubject);
clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
@@ -727,13 +758,12 @@
});
}
+ /**
+ * Bootstraps the map to attempt to get in sync with existing instances of the same map on other nodes in the
+ * cluster. This is necessary to ensure that a read immediately after the map is created doesn't return a null
+ * value.
+ */
private void bootstrap() {
- /*
- * Attempt to get in sync with the cluster when a map is created. This is to help avoid a new node
- * writing to an ECM until it has a view of the map. Depending on how lightweight the map instance
- * is, this will attempt to advertise to all or some of the peers.
- */
- int n = 0;
List<NodeId> activePeers = clusterService.getNodes()
.stream()
.map(ControllerNode::id)
@@ -745,15 +775,130 @@
return;
}
- if (lightweightAntiEntropy) {
- n = activePeers.size() / 2;
- } else {
- n = activePeers.size();
+ try {
+ requestBootstrapFromPeers(activePeers)
+ .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ log.debug("Failed to bootstrap ec map {}: {}", mapName, e.getCause());
+ } catch (InterruptedException | TimeoutException e) {
+ log.warn("Failed to bootstrap ec map {}: {}", mapName, e);
+ }
+ }
+
+ /**
+ * Requests all updates from each peer in the provided list of peers.
+ * <p>
+ * The returned future will be completed once at least one peer bootstraps this map or bootstrap requests to all
+ * peers fail.
+ *
+ * @param peers the list of peers from which to request updates
+ * @return a future to be completed once updates have been received from at least one peer
+ */
+ private CompletableFuture<Void> requestBootstrapFromPeers(List<NodeId> peers) {
+ if (peers.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
}
- for (int i = 0; i < n; i++) {
- sendAdvertisementToPeer(activePeers.get(i));
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ final int totalPeers = peers.size();
+
+ AtomicBoolean successful = new AtomicBoolean();
+ AtomicInteger totalCount = new AtomicInteger();
+ AtomicReference<Throwable> lastError = new AtomicReference<>();
+
+ // Iterate through all of the peers and send a bootstrap request. On the first peer that returns
+ // a successful bootstrap response, complete the future. Otherwise, if no peers respond with any
+ // successful bootstrap response, the future will be completed with the last exception.
+ for (NodeId peer : peers) {
+ requestBootstrapFromPeer(peer).whenComplete((result, error) -> {
+ if (error == null) {
+ if (successful.compareAndSet(false, true)) {
+ future.complete(null);
+ } else if (totalCount.incrementAndGet() == totalPeers) {
+ Throwable e = lastError.get();
+ if (e != null) {
+ future.completeExceptionally(e);
+ }
+ }
+ } else {
+ if (!successful.get() && totalCount.incrementAndGet() == totalPeers) {
+ future.completeExceptionally(error);
+ } else {
+ lastError.set(error);
+ }
+ }
+ });
}
+ return future;
+ }
+
+ /**
+ * Requests a bootstrap from the given peer.
+ *
+ * @param peer the peer from which to request updates
+ * @return a future to be completed once the peer has sent bootstrap updates
+ */
+ private CompletableFuture<Void> requestBootstrapFromPeer(NodeId peer) {
+ log.trace("Sending bootstrap request to {}", peer);
+ return clusterCommunicator.<NodeId, Void>sendAndReceive(
+ localNodeId,
+ bootstrapMessageSubject,
+ serializer::encode,
+ serializer::decode,
+ peer)
+ .whenComplete((updates, error) -> {
+ if (error != null) {
+ log.debug("Bootstrap request to {} failed: {}", peer, error.getMessage());
+ }
+ });
+ }
+
+ /**
+ * Handles a bootstrap request from a peer.
+ * <p>
+ * When handling a bootstrap request from a peer, the node sends batches of entries back to the peer and
+ * completes the bootstrap request once all batches have been received and processed.
+ *
+ * @param peer the peer that sent the bootstrap request
+ * @return a future to be completed once updates have been sent to the peer
+ */
+ private CompletableFuture<Void> handleBootstrap(NodeId peer) {
+ log.trace("Received bootstrap request from {}", peer);
+
+ Function<List<UpdateEntry<K, V>>, CompletableFuture<Void>> sendUpdates = updates -> {
+ log.trace("Initializing {} with {} entries", peer, updates.size());
+ return clusterCommunicator.<List<UpdateEntry<K, V>>, Void>sendAndReceive(
+ ImmutableList.copyOf(updates),
+ initializeMessageSubject,
+ serializer::encode,
+ serializer::decode,
+ peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to initialize {}", peer, error);
+ }
+ });
+ };
+
+ List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ List<UpdateEntry<K, V>> updates = Lists.newArrayList();
+ for (Map.Entry<K, MapValue<V>> entry : items.entrySet()) {
+ K key = entry.getKey();
+ MapValue<V> value = entry.getValue();
+ if (value.isAlive()) {
+ updates.add(new UpdateEntry<K, V>(key, value));
+ if (updates.size() == DEFAULT_MAX_EVENTS) {
+ futures.add(sendUpdates.apply(updates));
+ updates = new ArrayList<>();
+ }
+ }
+ }
+
+ if (!updates.isEmpty()) {
+ futures.add(sendUpdates.apply(updates));
+ }
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
}
// TODO pull this into the class if this gets pulled out...
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 8d41fd2..b12a7f4 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -84,6 +84,10 @@
private SequentialClockService<String, String> clockService;
private static final String MAP_NAME = "test";
+ private static final MessageSubject BOOTSTRAP_MESSAGE_SUBJECT
+ = new MessageSubject("ecm-" + MAP_NAME + "-bootstrap");
+ private static final MessageSubject INITIALIZE_MESSAGE_SUBJECT
+ = new MessageSubject("ecm-" + MAP_NAME + "-initialize");
private static final MessageSubject UPDATE_MESSAGE_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-update");
private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
@@ -117,6 +121,17 @@
// delegate to our ClusterCommunicationService implementation. This
// allows us to get a reference to the map's internal cluster message
// handlers so we can induce events coming in from a peer.
+ clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(Function.class),
+ anyObject(Function.class));
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
+ clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(Function.class),
+ anyObject(Function.class),
+ anyObject(Executor.class));
+ expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
@@ -481,6 +496,10 @@
EventuallyConsistentMapListener<String, String> listener
= getListener();
listener.event(new EventuallyConsistentMapEvent<>(
+ MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
+ listener.event(new EventuallyConsistentMapEvent<>(
+ MAP_NAME, EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2));
+ listener.event(new EventuallyConsistentMapEvent<>(
MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
listener.event(new EventuallyConsistentMapEvent<>(
MAP_NAME, EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
@@ -632,6 +651,8 @@
@Test
public void testDestroy() throws Exception {
+ clusterCommunicator.removeSubscriber(BOOTSTRAP_MESSAGE_SUBJECT);
+ clusterCommunicator.removeSubscriber(INITIALIZE_MESSAGE_SUBJECT);
clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
@@ -793,7 +814,7 @@
Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
- } else {
+ } else if (!subject.equals(INITIALIZE_MESSAGE_SUBJECT)) {
throw new RuntimeException("Unexpected message subject " + subject.toString());
}
}