[ONOS-7081] Modify EventuallyConsistentMap to bootstrap from old nodes during upgrade
Change-Id: I0bb60bf64319e083117b6d0f6f82842640e5185f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index f92e3f5..605bf25 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -15,10 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import java.io.File;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -28,6 +24,7 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -55,6 +52,11 @@
import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
+import java.io.File;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
import static org.onosproject.store.primitives.impl.PartitionManager.PARTITIONS_DIR;
@@ -109,9 +111,28 @@
@Override
public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
checkPermission(STORAGE_WRITE);
- return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+ final NodeId localNodeId = clusterService.getLocalNode().id();
+
+ Supplier<List<NodeId>> peersSupplier = () -> clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+
+ Supplier<List<NodeId>> bootstrapPeersSupplier = () -> clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .filter(id -> !localNodeId.equals(id))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+
+ return new EventuallyConsistentMapBuilderImpl<>(
+ localNodeId,
clusterCommunicator,
- persistenceService);
+ persistenceService,
+ peersSupplier,
+ bootstrapPeersSupplier
+ );
}
@Override
@@ -146,7 +167,7 @@
@Override
public <E> DistributedSetBuilder<E> setBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
+ return new DefaultDistributedSetBuilder<>(this::<E, Boolean>consistentMapBuilder);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index 9d6a016..df35bd6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -15,14 +15,7 @@
*/
package org.onosproject.store.primitives.impl;
-import java.util.Collection;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-
import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
@@ -30,6 +23,14 @@
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -38,9 +39,9 @@
*/
public class EventuallyConsistentMapBuilderImpl<K, V>
implements EventuallyConsistentMapBuilder<K, V> {
- private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
+ private NodeId localNodeId;
private String name;
private KryoNamespace serializer;
private KryoNamespace.Builder serializerBuilder;
@@ -56,21 +57,29 @@
private boolean persistent = false;
private boolean persistentMap = false;
private final PersistenceService persistenceService;
+ private Supplier<List<NodeId>> peersSupplier;
+ private Supplier<List<NodeId>> bootstrapPeersSupplier;
/**
* Creates a new eventually consistent map builder.
- *
- * @param clusterService cluster service
- * @param clusterCommunicator cluster communication service
- * @param persistenceService persistence service
+ * @param localNodeId local node id
+ * @param clusterCommunicator cluster communication service
+ * @param persistenceService persistence service
+ * @param peersSupplier supplier for peers
+ * @param bootstrapPeersSupplier supplier for peers for bootstrap
*/
public EventuallyConsistentMapBuilderImpl(
- ClusterService clusterService,
+ NodeId localNodeId,
ClusterCommunicationService clusterCommunicator,
- PersistenceService persistenceService) {
+ PersistenceService persistenceService,
+ Supplier<List<NodeId>> peersSupplier,
+ Supplier<List<NodeId>> bootstrapPeersSupplier
+ ) {
+ this.localNodeId = localNodeId;
this.persistenceService = persistenceService;
- this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
+ this.peersSupplier = peersSupplier;
+ this.bootstrapPeersSupplier = bootstrapPeersSupplier;
}
@Override
@@ -160,21 +169,26 @@
serializer = serializerBuilder.build(name);
}
checkNotNull(serializer, "serializer is a mandatory parameter");
+ checkNotNull(localNodeId, "local node id cannot be null");
- return new EventuallyConsistentMapImpl<>(name,
- clusterService,
- clusterCommunicator,
- serializer,
- timestampProvider,
- peerUpdateFunction,
- eventExecutor,
- communicationExecutor,
- backgroundExecutor,
- tombstonesDisabled,
- antiEntropyPeriod,
- antiEntropyTimeUnit,
- convergeFaster,
- persistent,
- persistenceService);
+ return new EventuallyConsistentMapImpl<>(
+ localNodeId,
+ name,
+ clusterCommunicator,
+ serializer,
+ timestampProvider,
+ peerUpdateFunction,
+ eventExecutor,
+ communicationExecutor,
+ backgroundExecutor,
+ tombstonesDisabled,
+ antiEntropyPeriod,
+ antiEntropyTimeUnit,
+ convergeFaster,
+ persistent,
+ persistenceService,
+ peersSupplier,
+ bootstrapPeersSupplier
+ );
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 462d4ae..74e567b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -15,6 +15,33 @@
*/
package org.onosproject.store.primitives.impl;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SlidingWindowCounter;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -37,36 +64,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.util.AbstractAccumulator;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.LogicalTimestamp;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.DistributedPrimitive;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -83,55 +83,43 @@
implements EventuallyConsistentMap<K, V> {
private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
+ private static final String ERROR_DESTROYED = " map is already destroyed";
+ private static final String ERROR_NULL_KEY = "Key cannot be null";
+ private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+ private static final int WINDOW_SIZE = 5;
+ private static final int HIGH_LOAD_THRESHOLD = 2;
+ private static final int LOAD_WINDOW = 2;
private final Map<K, MapValue<V>> items;
-
- private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final Serializer serializer;
- private final NodeId localNodeId;
private final PersistenceService persistenceService;
-
private final BiFunction<K, V, Timestamp> timestampProvider;
-
private final MessageSubject bootstrapMessageSubject;
private final MessageSubject initializeMessageSubject;
private final MessageSubject updateMessageSubject;
private final MessageSubject antiEntropyAdvertisementSubject;
private final MessageSubject updateRequestSubject;
-
private final Set<EventuallyConsistentMapListener<K, V>> listeners
= Sets.newCopyOnWriteArraySet();
-
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
-
private final ExecutorService communicationExecutor;
private final Map<NodeId, EventAccumulator> senderPending;
-
- private long previousTombstonePurgeTime;
private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
-
private final String mapName;
-
- private volatile boolean destroyed = false;
- private static final String ERROR_DESTROYED = " map is already destroyed";
private final String destroyedMessage;
-
- private static final String ERROR_NULL_KEY = "Key cannot be null";
- private static final String ERROR_NULL_VALUE = "Null values are not allowed";
-
private final long initialDelaySec = 5;
private final boolean lightweightAntiEntropy;
private final boolean tombstonesDisabled;
-
- private static final int WINDOW_SIZE = 5;
- private static final int HIGH_LOAD_THRESHOLD = 2;
- private static final int LOAD_WINDOW = 2;
- private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
-
private final boolean persistent;
+ private final Supplier<List<NodeId>> peersSupplier;
+ private final Supplier<List<NodeId>> bootstrapPeersSupplier;
+ private final NodeId localNodeId;
+ private long previousTombstonePurgeTime;
+ private volatile boolean destroyed = false;
+ private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
/**
* Creates a new eventually consistent map shared amongst multiple instances.
@@ -140,42 +128,47 @@
* for more description of the parameters expected by the map.
* </p>
*
- * @param mapName a String identifier for the map.
- * @param clusterService the cluster service
- * @param clusterCommunicator the cluster communications service
- * @param ns a Kryo namespace that can serialize
- * both K and V
- * @param timestampProvider provider of timestamps for K and V
- * @param peerUpdateFunction function that provides a set of nodes to immediately
- * update to when there writes to the map
- * @param eventExecutor executor to use for processing incoming
- * events from peers
- * @param communicationExecutor executor to use for sending events to peers
- * @param backgroundExecutor executor to use for background anti-entropy
- * tasks
- * @param tombstonesDisabled true if this map should not maintain
- * tombstones
- * @param antiEntropyPeriod period that the anti-entropy task should run
- * @param antiEntropyTimeUnit time unit for anti-entropy period
- * @param convergeFaster make anti-entropy try to converge faster
- * @param persistent persist data to disk
- * @param persistenceService persistence service
+ * @param localNodeId local node id
+ * @param mapName a String identifier for the map.
+ * @param clusterCommunicator the cluster communications service
+ * @param ns a Kryo namespace that can serialize both K and V
+ * @param timestampProvider provider of timestamps for K and V
+ * @param peerUpdateFunction function that provides a set of nodes to immediately
+ * update to when there writes to the map
+ * @param eventExecutor executor to use for processing incoming events from peers
+ * @param communicationExecutor executor to use for sending events to peers
+ * @param backgroundExecutor executor to use for background anti-entropy tasks
+ * @param tombstonesDisabled true if this map should not maintain tombstones
+ * @param antiEntropyPeriod period that the anti-entropy task should run
+ * @param antiEntropyTimeUnit time unit for anti-entropy period
+ * @param convergeFaster make anti-entropy try to converge faster
+ * @param persistent persist data to disk
+ * @param persistenceService persistence service
+ * @param peersSupplier supplier for peers
+ * @param bootstrapPeersSupplier supplier for bootstrap peers
*/
- EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
- KryoNamespace ns,
- BiFunction<K, V, Timestamp> timestampProvider,
- BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
- ExecutorService eventExecutor,
- ExecutorService communicationExecutor,
- ScheduledExecutorService backgroundExecutor,
- boolean tombstonesDisabled,
- long antiEntropyPeriod,
- TimeUnit antiEntropyTimeUnit,
- boolean convergeFaster,
- boolean persistent,
- PersistenceService persistenceService) {
+ //CHECKSTYLE:OFF
+ EventuallyConsistentMapImpl(
+ NodeId localNodeId,
+ String mapName,
+ ClusterCommunicationService clusterCommunicator,
+ KryoNamespace ns,
+ BiFunction<K, V, Timestamp> timestampProvider,
+ BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
+ ExecutorService eventExecutor,
+ ExecutorService communicationExecutor,
+ ScheduledExecutorService backgroundExecutor,
+ boolean tombstonesDisabled,
+ long antiEntropyPeriod,
+ TimeUnit antiEntropyTimeUnit,
+ boolean convergeFaster,
+ boolean persistent,
+ PersistenceService persistenceService,
+ Supplier<List<NodeId>> peersSupplier,
+ Supplier<List<NodeId>> bootstrapPeersSupplier
+ ) {
+ //CHECKSTYLE:ON
+ this.localNodeId = localNodeId;
this.mapName = mapName;
this.serializer = createSerializer(ns);
this.persistenceService = persistenceService;
@@ -192,19 +185,20 @@
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
- this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
- this.localNodeId = clusterService.getLocalNode().id();
this.timestampProvider = timestampProvider;
+ this.peersSupplier = peersSupplier;
+ this.bootstrapPeersSupplier = bootstrapPeersSupplier;
+
if (peerUpdateFunction != null) {
- this.peerUpdateFunction = peerUpdateFunction;
+ this.peerUpdateFunction = peerUpdateFunction.andThen(peers -> peersSupplier.get()
+ .stream()
+ .filter(peers::contains)
+ .collect(Collectors.toList()));
} else {
- this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
- .map(ControllerNode::id)
- .filter(nodeId -> !nodeId.equals(localNodeId))
- .collect(Collectors.toList());
+ this.peerUpdateFunction = (key, value) -> peersSupplier.get();
}
if (eventExecutor != null) {
@@ -212,7 +206,8 @@
} else {
// should be a normal executor; it's used for receiving messages
this.executor =
- Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d", log));
+ Executors.newFixedThreadPool(8,
+ groupedThreads("onos/ecm", mapName + "-fg-%d", log));
}
if (communicationExecutor != null) {
@@ -221,7 +216,8 @@
// sending executor; should be capped
//TODO this probably doesn't need to be bounded anymore
this.communicationExecutor =
- newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d", log));
+ newFixedThreadPool(8,
+ groupedThreads("onos/ecm", mapName + "-publish-%d", log));
}
@@ -229,55 +225,70 @@
this.backgroundExecutor = backgroundExecutor;
} else {
this.backgroundExecutor =
- newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d", log));
+ newSingleThreadScheduledExecutor(
+ groupedThreads("onos/ecm", mapName + "-bg-%d", log));
}
// start anti-entropy thread
- this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
- initialDelaySec, antiEntropyPeriod,
- antiEntropyTimeUnit);
+ this.backgroundExecutor.scheduleAtFixedRate(
+ this::sendAdvertisement,
+ initialDelaySec, antiEntropyPeriod,
+ antiEntropyTimeUnit
+ );
bootstrapMessageSubject = new MessageSubject("ecm-" + mapName + "-bootstrap");
- clusterCommunicator.addSubscriber(bootstrapMessageSubject,
- serializer::decode,
- (Function<NodeId, CompletableFuture<Void>>) this::handleBootstrap,
- serializer::encode);
+ clusterCommunicator.addSubscriber(
+ bootstrapMessageSubject,
+ serializer::decode,
+ (Function<NodeId, CompletableFuture<Void>>) this::handleBootstrap,
+ serializer::encode
+ );
initializeMessageSubject = new MessageSubject("ecm-" + mapName + "-initialize");
- clusterCommunicator.addSubscriber(initializeMessageSubject,
+ clusterCommunicator.addSubscriber(
+ initializeMessageSubject,
serializer::decode,
(Function<Collection<UpdateEntry<K, V>>, Void>) u -> {
processUpdates(u);
return null;
},
serializer::encode,
- this.executor);
+ this.executor
+ );
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
- clusterCommunicator.addSubscriber(updateMessageSubject,
- serializer::decode,
- this::processUpdates,
- this.executor);
+ clusterCommunicator.addSubscriber(
+ updateMessageSubject,
+ serializer::decode,
+ this::processUpdates,
+ this.executor
+ );
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
- clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
- serializer::decode,
- this::handleAntiEntropyAdvertisement,
- serializer::encode,
- this.backgroundExecutor);
+ clusterCommunicator.addSubscriber(
+ antiEntropyAdvertisementSubject,
+ serializer::decode,
+ this::handleAntiEntropyAdvertisement,
+ serializer::encode,
+ this.backgroundExecutor
+ );
updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
- clusterCommunicator.addSubscriber(updateRequestSubject,
- serializer::decode,
- this::handleUpdateRequests,
- this.backgroundExecutor);
+ clusterCommunicator.addSubscriber(
+ updateRequestSubject,
+ serializer::decode,
+ this::handleUpdateRequests,
+ this.backgroundExecutor
+ );
if (!tombstonesDisabled) {
previousTombstonePurgeTime = 0;
- this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
- initialDelaySec,
- antiEntropyPeriod,
- TimeUnit.SECONDS);
+ this.backgroundExecutor.scheduleWithFixedDelay(
+ this::purgeTombstones,
+ initialDelaySec,
+ antiEntropyPeriod,
+ TimeUnit.SECONDS
+ );
}
this.tombstonesDisabled = tombstonesDisabled;
@@ -289,20 +300,20 @@
private Serializer createSerializer(KryoNamespace ns) {
return Serializer.using(KryoNamespace.newBuilder()
- .register(ns)
- // not so robust way to avoid collision with other
- // user supplied registrations
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
- .register(KryoNamespaces.BASIC)
- .register(LogicalTimestamp.class)
- .register(WallClockTimestamp.class)
- .register(AntiEntropyAdvertisement.class)
- .register(AntiEntropyResponse.class)
- .register(UpdateEntry.class)
- .register(MapValue.class)
- .register(MapValue.Digest.class)
- .register(UpdateRequest.class)
- .build(name() + "-ecmap"));
+ .register(ns)
+ // not so robust way to avoid collision with other
+ // user supplied registrations
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+ .register(KryoNamespaces.BASIC)
+ .register(LogicalTimestamp.class)
+ .register(WallClockTimestamp.class)
+ .register(AntiEntropyAdvertisement.class)
+ .register(AntiEntropyResponse.class)
+ .register(UpdateEntry.class)
+ .register(MapValue.class)
+ .register(MapValue.Digest.class)
+ .register(UpdateRequest.class)
+ .build(name() + "-ecmap"));
}
@Override
@@ -335,9 +346,9 @@
checkState(!destroyed, destroyedMessage);
checkNotNull(value, ERROR_NULL_VALUE);
return items.values()
- .stream()
- .filter(MapValue::isAlive)
- .anyMatch(v -> value.equals(v.get()));
+ .stream()
+ .filter(MapValue::isAlive)
+ .anyMatch(v -> value.equals(v.get()));
}
@Override
@@ -384,7 +395,7 @@
MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
if (previousValue != null) {
notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
- peerUpdateFunction.apply(key, previousValue.get()));
+ peerUpdateFunction.apply(key, previousValue.get()));
if (previousValue.isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
@@ -474,14 +485,14 @@
public void clear() {
checkState(!destroyed, destroyedMessage);
Maps.filterValues(items, MapValue::isAlive)
- .forEach((k, v) -> remove(k));
+ .forEach((k, v) -> remove(k));
}
@Override
public Set<K> keySet() {
checkState(!destroyed, destroyedMessage);
return Maps.filterValues(items, MapValue::isAlive)
- .keySet();
+ .keySet();
}
@Override
@@ -494,16 +505,16 @@
public Set<Map.Entry<K, V>> entrySet() {
checkState(!destroyed, destroyedMessage);
return Maps.filterValues(items, MapValue::isAlive)
- .entrySet()
- .stream()
- .map(e -> Pair.of(e.getKey(), e.getValue().get()))
- .collect(Collectors.toSet());
+ .entrySet()
+ .stream()
+ .map(e -> Pair.of(e.getKey(), e.getValue().get()))
+ .collect(Collectors.toSet());
}
/**
* Returns true if newValue was accepted i.e. map is updated.
*
- * @param key key
+ * @param key key
* @param newValue proposed new value
* @return true if update happened; false if map already contains a more recent value for the key
*/
@@ -575,7 +586,7 @@
return;
}
peers.forEach(node ->
- senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+ senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
);
}
@@ -596,14 +607,9 @@
}
private Optional<NodeId> pickRandomActivePeer() {
- List<NodeId> activePeers = clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .filter(id -> !localNodeId.equals(id))
- .filter(id -> clusterService.getState(id).isActive())
- .collect(Collectors.toList());
+ List<NodeId> activePeers = peersSupplier.get();
Collections.shuffle(activePeers);
- return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
+ return activePeers.stream().findFirst();
}
private void sendAdvertisementToPeer(NodeId peer) {
@@ -680,14 +686,12 @@
if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
// local value is more recent, push to sender
queueUpdate(new UpdateEntry<>(key, localValue), peers);
- } else if (remoteValueDigest != null
- && remoteValueDigest.isNewerThan(localValue.digest())
- && remoteValueDigest.isTombstone()) {
+ } else if (remoteValueDigest.isNewerThan(localValue.digest()) && remoteValueDigest.isTombstone()) {
// remote value is more recent and a tombstone: update local value
MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
MapValue<V> previousValue = removeInternal(key,
- Optional.empty(),
- Optional.of(tombstone));
+ Optional.empty(),
+ Optional.of(tombstone));
if (previousValue != null && previousValue.isAlive()) {
externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
@@ -707,9 +711,8 @@
final Set<K> keys = request.keys();
final NodeId sender = request.sender();
final List<NodeId> peers = ImmutableList.of(sender);
-
keys.forEach(key ->
- queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
+ queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
);
}
@@ -721,21 +724,19 @@
* AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
* as the time before which all tombstones are considered safe to purge.
*/
- long currentSafeTombstonePurgeTime = clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .filter(id -> !id.equals(localNodeId))
- .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
- .reduce(Math::min)
- .orElse(0L);
+ long currentSafeTombstonePurgeTime = peersSupplier.get()
+ .stream()
+ .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
+ .reduce(Math::min)
+ .orElse(0L);
if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
return;
}
List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
- .stream()
- .filter(e -> e.getValue().isTombstone())
- .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
- .collect(Collectors.toList());
+ .stream()
+ .filter(e -> e.getValue().isTombstone())
+ .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
+ .collect(Collectors.toList());
previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
}
@@ -764,24 +765,15 @@
* value.
*/
private void bootstrap() {
- List<NodeId> activePeers = clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .filter(id -> !localNodeId.equals(id))
- .filter(id -> clusterService.getState(id).isActive())
- .collect(Collectors.toList());
-
+ List<NodeId> activePeers = bootstrapPeersSupplier.get();
if (activePeers.isEmpty()) {
return;
}
-
try {
requestBootstrapFromPeers(activePeers)
.get(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- log.debug("Failed to bootstrap ec map {}: {}", mapName, e.getCause());
- } catch (InterruptedException | TimeoutException e) {
- log.warn("Failed to bootstrap ec map {}: {}", mapName, e);
+ } catch (ExecutionException | InterruptedException | TimeoutException e) {
+ log.debug("Failed to bootstrap ec map {}: {}", mapName, ExceptionUtils.getStackTrace(e));
}
}
@@ -798,11 +790,8 @@
if (peers.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
-
CompletableFuture<Void> future = new CompletableFuture<>();
-
final int totalPeers = peers.size();
-
AtomicBoolean successful = new AtomicBoolean();
AtomicInteger totalCount = new AtomicInteger();
AtomicReference<Throwable> lastError = new AtomicReference<>();
@@ -840,7 +829,7 @@
* @return a future to be completed once the peer has sent bootstrap updates
*/
private CompletableFuture<Void> requestBootstrapFromPeer(NodeId peer) {
- log.trace("Sending bootstrap request to {}", peer);
+ log.trace("Sending bootstrap request to {} for {}", peer, mapName);
return clusterCommunicator.<NodeId, Void>sendAndReceive(
localNodeId,
bootstrapMessageSubject,
@@ -864,7 +853,7 @@
* @return a future to be completed once updates have been sent to the peer
*/
private CompletableFuture<Void> handleBootstrap(NodeId peer) {
- log.trace("Received bootstrap request from {}", peer);
+ log.trace("Received bootstrap request from {} for {}", peer, bootstrapMessageSubject);
Function<List<UpdateEntry<K, V>>, CompletableFuture<Void>> sendUpdates = updates -> {
log.trace("Initializing {} with {} entries", peer, updates.size());
@@ -887,7 +876,7 @@
K key = entry.getKey();
MapValue<V> value = entry.getValue();
if (value.isAlive()) {
- updates.add(new UpdateEntry<K, V>(key, value));
+ updates.add(new UpdateEntry<>(key, value));
if (updates.size() == DEFAULT_MAX_EVENTS) {
futures.add(sendUpdates.apply(updates));
updates = new ArrayList<>();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index b73dd61..62a29bf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,13 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -30,6 +23,10 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.Member;
+import org.onosproject.cluster.MembershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -65,6 +62,13 @@
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
import static org.slf4j.LoggerFactory.getLogger;
@@ -95,6 +99,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PartitionAdminService partitionAdminService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MembershipService membershipService;
+
private final Supplier<TransactionId> transactionIdGenerator =
() -> TransactionId.from(UUID.randomUUID().toString());
private DistributedPrimitiveCreator federatedPrimitiveCreator;
@@ -119,9 +126,40 @@
@Override
public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
checkPermission(STORAGE_WRITE);
- return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+ final NodeId localNodeId = clusterService.getLocalNode().id();
+
+ Supplier<List<NodeId>> peersSupplier = () -> membershipService.getMembers().stream()
+ .map(Member::nodeId)
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+
+ Supplier<List<NodeId>> bootstrapPeersSupplier = () -> {
+ if (membershipService.getMembers().size() == 1) {
+ return clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .filter(id -> !localNodeId.equals(id))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+ } else {
+ return membershipService.getMembers()
+ .stream()
+ .map(Member::nodeId)
+ .filter(id -> !localNodeId.equals(id))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+ }
+ };
+
+
+ return new EventuallyConsistentMapBuilderImpl<>(
+ localNodeId,
clusterCommunicator,
- persistenceService);
+ persistenceService,
+ peersSupplier,
+ bootstrapPeersSupplier
+ );
}
@Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 41cf18c..f16c688 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -45,13 +45,15 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
@@ -66,9 +68,6 @@
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
/**
@@ -79,7 +78,6 @@
private EventuallyConsistentMap<String, String> ecMap;
private PersistenceService persistenceService;
- private ClusterService clusterService;
private ClusterCommunicationService clusterCommunicator;
private SequentialClockService<String, String> clockService;
@@ -106,14 +104,10 @@
private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
private Consumer<Collection<UpdateRequest<String>>> requestHandler;
private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
+ private Supplier<List<NodeId>> peersHandler = ArrayList::new;
@Before
public void setUp() throws Exception {
- clusterService = createMock(ClusterService.class);
- expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
- expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
- replay(clusterService);
-
clusterCommunicator = createMock(ClusterCommunicationService.class);
persistenceService = new TestPersistenceService();
@@ -154,7 +148,12 @@
.register(TestTimestamp.class);
ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
- clusterService, clusterCommunicator, persistenceService)
+ NodeId.nodeId("0"),
+ clusterCommunicator,
+ persistenceService,
+ peersHandler,
+ peersHandler
+ )
.withName(MAP_NAME)
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))