Implemented a Builder pattern for EventuallyConsistentMaps.
EventuallyConsistentMap has been moved to the API package so is now available outside the stores.
ONOS-1357
Change-Id: I1c892eb3dbefa72cb3f3eb3ccc74e9a02c7e2ac9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 2987529..62d145d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -32,10 +32,13 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +57,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -93,8 +95,8 @@
private final ScheduledExecutorService backgroundExecutor;
private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
- private ExecutorService communicationExecutor;
- private Map<NodeId, EventAccumulator> senderPending;
+ private final ExecutorService communicationExecutor;
+ private final Map<NodeId, EventAccumulator> senderPending;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
@@ -103,130 +105,115 @@
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
- // TODO: Make these anti-entropy params configurable
- private long initialDelaySec = 5;
- private long periodSec = 5;
- private boolean lightweightAntiEntropy = true;
- private boolean tombstonesDisabled = false;
+ private final long initialDelaySec = 5;
+ private final boolean lightweightAntiEntropy;
+ private final boolean tombstonesDisabled;
private static final int WINDOW_SIZE = 5;
private static final int HIGH_LOAD_THRESHOLD = 0;
private static final int LOAD_WINDOW = 2;
- SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
- AtomicLong operations = new AtomicLong();
+ private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
- * Each map is identified by a string map name. EventuallyConsistentMapImpl
- * objects in different JVMs that use the same map name will form a
- * distributed map across JVMs (provided the cluster service is aware of
- * both nodes).
- * </p>
- * <p>
- * The client is expected to provide an
- * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
- * will be stored in this map have been registered (including referenced
- * classes). This serializer will be used to serialize both K and V for
- * inter-node notifications.
- * </p>
- * <p>
- * The client must provide an {@link org.onosproject.store.impl.ClockService}
- * which can generate timestamps for a given key. The clock service is free
- * to generate timestamps however it wishes, however these timestamps will
- * be used to serialize updates to the map so they must be strict enough
- * to ensure updates are properly ordered for the use case (i.e. in some
- * cases wallclock time will suffice, whereas in other cases logical time
- * will be necessary).
+ * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
+ * for more description of the parameters expected by the map.
* </p>
*
- * @param mapName a String identifier for the map.
- * @param clusterService the cluster service
- * @param clusterCommunicator the cluster communications service
- * @param serializerBuilder a Kryo namespace builder that can serialize
- * both K and V
- * @param clockService a clock service able to generate timestamps
- * for K
- * @param peerUpdateFunction function that provides a set of nodes to immediately
- * update to when there writes to the map
+ * @param mapName a String identifier for the map.
+ * @param clusterService the cluster service
+ * @param clusterCommunicator the cluster communications service
+ * @param serializerBuilder a Kryo namespace builder that can serialize
+ * both K and V
+ * @param clockService a clock service able to generate timestamps
+ * for K and V
+ * @param peerUpdateFunction function that provides a set of nodes to immediately
+ * update to when there writes to the map
+ * @param eventExecutor executor to use for processing incoming
+ * events from peers
+ * @param communicationExecutor executor to use for sending events to peers
+ * @param backgroundExecutor executor to use for background anti-entropy
+ * tasks
+ * @param tombstonesDisabled true if this map should not maintain
+ * tombstones
+ * @param antiEntropyPeriod period that the anti-entropy task should run
+ * in seconds
+ * @param convergeFaster make anti-entropy try to converge faster
*/
- public EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
- KryoNamespace.Builder serializerBuilder,
- ClockService<K, V> clockService,
- BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
- this.clusterService = checkNotNull(clusterService);
- this.clusterCommunicator = checkNotNull(clusterCommunicator);
- this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
-
- serializer = createSerializer(checkNotNull(serializerBuilder));
- destroyedMessage = mapName + ERROR_DESTROYED;
-
- this.clockService = checkNotNull(clockService);
-
+ EventuallyConsistentMapImpl(String mapName,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ KryoNamespace.Builder serializerBuilder,
+ ClockService<K, V> clockService,
+ BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
+ ExecutorService eventExecutor,
+ ExecutorService communicationExecutor,
+ ScheduledExecutorService backgroundExecutor,
+ boolean tombstonesDisabled,
+ long antiEntropyPeriod,
+ TimeUnit antiEntropyTimeUnit,
+ boolean convergeFaster) {
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
-
- // should be a normal executor; it's used for receiving messages
- //TODO make # of threads configurable
- executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
-
- // sending executor; should be capped
- //TODO make # of threads configurable
- //TODO this probably doesn't need to be bounded anymore
- communicationExecutor =
- newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
senderPending = Maps.newConcurrentMap();
+ destroyedMessage = mapName + ERROR_DESTROYED;
- backgroundExecutor =
- newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+ this.clusterService = clusterService;
+ this.clusterCommunicator = clusterCommunicator;
+
+ this.serializer = createSerializer(serializerBuilder);
+
+ this.clockService = clockService;
+
+ if (peerUpdateFunction != null) {
+ this.peerUpdateFunction = peerUpdateFunction;
+ } else {
+ this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+ .collect(Collectors.toList());
+ }
+
+ if (eventExecutor != null) {
+ this.executor = eventExecutor;
+ } else {
+ // should be a normal executor; it's used for receiving messages
+ this.executor =
+ Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+ }
+
+ if (communicationExecutor != null) {
+ this.communicationExecutor = communicationExecutor;
+ } else {
+ // sending executor; should be capped
+ //TODO this probably doesn't need to be bounded anymore
+ this.communicationExecutor =
+ newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+ }
+
+ if (backgroundExecutor != null) {
+ this.backgroundExecutor = backgroundExecutor;
+ } else {
+ this.backgroundExecutor =
+ newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+ }
// start anti-entropy thread
- backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
- initialDelaySec, periodSec,
- TimeUnit.SECONDS);
+ this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, antiEntropyPeriod,
+ antiEntropyTimeUnit);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
- new InternalEventListener(), executor);
+ new InternalEventListener(), this.executor);
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
- new InternalAntiEntropyListener(), backgroundExecutor);
- }
+ new InternalAntiEntropyListener(), this.backgroundExecutor);
- /**
- * Creates a new eventually consistent map shared amongst multiple instances.
- * <p>
- * Take a look at the other constructor for usage information. The only difference
- * is that a BiFunction is provided that returns all nodes in the cluster, so
- * all nodes will be sent write updates immediately.
- * </p>
- *
- * @param mapName a String identifier for the map.
- * @param clusterService the cluster service
- * @param clusterCommunicator the cluster communications service
- * @param serializerBuilder a Kryo namespace builder that can serialize
- * both K and V
- * @param clockService a clock service able to generate timestamps
- * for K
- */
- public EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
- KryoNamespace.Builder serializerBuilder,
- ClockService<K, V> clockService) {
- this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
- (key, value) -> clusterService.getNodes().stream()
- .map(ControllerNode::id)
- .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
- .collect(Collectors.toList()));
- }
-
- public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
- tombstonesDisabled = status;
- return this;
+ this.tombstonesDisabled = tombstonesDisabled;
+ this.lightweightAntiEntropy = !convergeFaster;
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -246,19 +233,6 @@
};
}
- /**
- * Sets the executor to use for broadcasting messages and returns this
- * instance for method chaining.
- *
- * @param executor executor service
- * @return this instance
- */
- public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
- checkNotNull(executor, "Null executor");
- communicationExecutor = executor;
- return this;
- }
-
@Override
public int size() {
checkState(!destroyed, destroyedMessage);