Implemented a Builder pattern for EventuallyConsistentMaps.
EventuallyConsistentMap has been moved to the API package so is now available outside the stores.
ONOS-1357
Change-Id: I1c892eb3dbefa72cb3f3eb3ccc74e9a02c7e2ac9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java
deleted file mode 100644
index 8cda45d..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A distributed, eventually consistent map.
- * <p>
- * This map does not offer read after writes consistency. Operations are
- * serialized via the timestamps issued by the clock service. If two updates
- * are in conflict, the update with the more recent timestamp will endure.
- * </p><p>
- * The interface is mostly similar to {@link java.util.Map} with some minor
- * semantic changes and the addition of a listener framework (because the map
- * can be mutated by clients on other instances, not only through the local Java
- * API).
- * </p><p>
- * Clients are expected to register an
- * {@link EventuallyConsistentMapListener} if they
- * are interested in receiving notifications of update to the map.
- * </p><p>
- * Null values are not allowed in this map.
- * </p>
- */
-public interface EventuallyConsistentMap<K, V> {
-
- /**
- * Returns the number of key-value mappings in this map.
- *
- * @return number of key-value mappings
- */
- public int size();
-
- /**
- * Returns true if this map is empty.
- *
- * @return true if this map is empty, otherwise false
- */
- public boolean isEmpty();
-
- /**
- * Returns true if the map contains a mapping for the specified key.
- *
- * @param key the key to check if this map contains
- * @return true if this map has a mapping for the key, otherwise false
- */
- public boolean containsKey(K key);
-
- /**
- * Returns true if the map contains a mapping from any key to the specified
- * value.
- *
- * @param value the value to check if this map has a mapping for
- * @return true if this map has a mapping to this value, otherwise false
- */
- public boolean containsValue(V value);
-
- /**
- * Returns the value mapped to the specified key.
- *
- * @param key the key to look up in this map
- * @return the value mapped to the key, or null if no mapping is found
- */
- public V get(K key);
-
- /**
- * Associates the specified value to the specified key in this map.
- * <p>
- * Note: this differs from the specification of {@link java.util.Map}
- * because it does not return the previous value associated with the key.
- * Clients are expected to register an
- * {@link EventuallyConsistentMapListener} if
- * they are interested in receiving notification of updates to the map.
- * </p><p>
- * Null values are not allowed in the map.
- * </p>
- *
- * @param key the key to add a mapping for in this map
- * @param value the value to associate with the key in this map
- */
- public void put(K key, V value);
-
- /**
- * Removes the mapping associated with the specified key from the map.
- * <p>
- * Note: this differs from the specification of {@link java.util.Map}
- * because it does not return the previous value associated with the key.
- * Clients are expected to register an
- * {@link EventuallyConsistentMapListener} if
- * they are interested in receiving notification of updates to the map.
- * </p>
- *
- * @param key the key to remove the mapping for
- */
- public void remove(K key);
-
- /**
- * Removes the given key-value mapping from the map, if it exists.
- * <p>
- * This actually means remove any values up to and including the timestamp
- * given by {@link org.onosproject.store.impl.ClockService#getTimestamp(Object, Object)}.
- * Any mappings that produce an earlier timestamp than this given key-value
- * pair will be removed, and any mappings that produce a later timestamp
- * will supersede this remove.
- * </p><p>
- * Note: this differs from the specification of {@link java.util.Map}
- * because it does not return a boolean indication whether a value was removed.
- * Clients are expected to register an
- * {@link EventuallyConsistentMapListener} if
- * they are interested in receiving notification of updates to the map.
- * </p>
- *
- * @param key the key to remove the mapping for
- * @param value the value mapped to the key
- */
- public void remove(K key, V value);
-
- /**
- * Adds mappings for all key-value pairs in the specified map to this map.
- * <p>
- * This will be more efficient in communication than calling individual put
- * operations.
- * </p>
- *
- * @param m a map of values to add to this map
- */
- public void putAll(Map<? extends K, ? extends V> m);
-
- /**
- * Removes all mappings from this map.
- */
- public void clear();
-
- /**
- * Returns a set of the keys in this map. Changes to the set are not
- * reflected back to the map.
- *
- * @return set of keys in the map
- */
- public Set<K> keySet();
-
- /**
- * Returns a collections of values in this map. Changes to the collection
- * are not reflected back to the map.
- *
- * @return collection of values in the map
- */
- public Collection<V> values();
-
- /**
- * Returns a set of mappings contained in this map. Changes to the set are
- * not reflected back to the map.
- *
- * @return set of key-value mappings in this map
- */
- public Set<Map.Entry<K, V>> entrySet();
-
- /**
- * Adds the specified listener to the map which will be notified whenever
- * the mappings in the map are changed.
- *
- * @param listener listener to register for events
- */
- public void addListener(EventuallyConsistentMapListener<K, V> listener);
-
- /**
- * Removes the specified listener from the map such that it will no longer
- * receive change notifications.
- *
- * @param listener listener to deregister for events
- */
- public void removeListener(EventuallyConsistentMapListener<K, V> listener);
-
- /**
- * Shuts down the map and breaks communication between different instances.
- * This allows the map objects to be cleaned up and garbage collected.
- * Calls to any methods on the map subsequent to calling destroy() will
- * throw a {@link java.lang.RuntimeException}.
- */
- public void destroy();
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
new file mode 100644
index 0000000..9d60143
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.ecmap;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Eventually consistent map builder.
+ */
+public class EventuallyConsistentMapBuilderImpl<K, V>
+ implements EventuallyConsistentMapBuilder<K, V> {
+ private final ClusterService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
+
+ private String name;
+ private KryoNamespace.Builder serializerBuilder;
+ private ExecutorService eventExecutor;
+ private ExecutorService communicationExecutor;
+ private ScheduledExecutorService backgroundExecutor;
+ private ClockService<K, V> clockService;
+ private BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
+ private boolean tombstonesDisabled = false;
+ private long antiEntropyPeriod = 5;
+ private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
+ private boolean convergeFaster = false;
+
+ /**
+ * Creates a new eventually consistent map builder.
+ *
+ * @param clusterService cluster service
+ * @param clusterCommunicator cluster communication service
+ */
+ public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator) {
+ this.clusterService = checkNotNull(clusterService);
+ this.clusterCommunicator = checkNotNull(clusterCommunicator);
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withName(String name) {
+ this.name = checkNotNull(name);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withSerializer(
+ KryoNamespace.Builder serializerBuilder) {
+ this.serializerBuilder = checkNotNull(serializerBuilder);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withClockService(
+ ClockService<K, V> clockService) {
+ this.clockService = checkNotNull(clockService);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withEventExecutor(ExecutorService executor) {
+ this.eventExecutor = checkNotNull(executor);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder<K, V> withCommunicationExecutor(
+ ExecutorService executor) {
+ communicationExecutor = checkNotNull(executor);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withBackgroundExecutor(ScheduledExecutorService executor) {
+ this.backgroundExecutor = checkNotNull(executor);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withPeerUpdateFunction(
+ BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
+ this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder<K, V> withTombstonesDisabled() {
+ tombstonesDisabled = true;
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder<K, V> withAntiEntropyPeriod(long period, TimeUnit unit) {
+ checkArgument(period > 0, "anti-entropy period must be greater than 0");
+ antiEntropyPeriod = period;
+ antiEntropyTimeUnit = checkNotNull(unit);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder<K, V> withFasterConvergence() {
+ convergeFaster = true;
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMap<K, V> build() {
+ checkNotNull(name, "name is a mandatory parameter");
+ checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
+ checkNotNull(clockService, "clockService is a mandatory parameter");
+
+ return new EventuallyConsistentMapImpl<>(name,
+ clusterService,
+ clusterCommunicator,
+ serializerBuilder,
+ clockService,
+ peerUpdateFunction,
+ eventExecutor,
+ communicationExecutor,
+ backgroundExecutor,
+ tombstonesDisabled,
+ antiEntropyPeriod,
+ antiEntropyTimeUnit,
+ convergeFaster);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java
deleted file mode 100644
index ee7d42e..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Objects;
-
-/**
- * Event object signalling that the map was modified.
- */
-public class EventuallyConsistentMapEvent<K, V> {
-
- public enum Type {
- PUT,
- REMOVE
- }
-
- private final Type type;
- private final K key;
- private final V value;
-
- /**
- * Creates a new event object.
- *
- * @param type the type of the event
- * @param key the key the event concerns
- * @param value the value related to the key, or null for remove events
- */
- public EventuallyConsistentMapEvent(Type type, K key, V value) {
- this.type = type;
- this.key = key;
- this.value = value;
- }
-
- /**
- * Returns the type of the event.
- *
- * @return the type of the event
- */
- public Type type() {
- return type;
- }
-
- /**
- * Returns the key this event concerns.
- *
- * @return the key
- */
- public K key() {
- return key;
- }
-
- /**
- * Returns the value associated with this event.
- *
- * @return the value, or null if the event was REMOVE
- */
- public V value() {
- return value;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof EventuallyConsistentMapEvent)) {
- return false;
- }
-
- EventuallyConsistentMapEvent that = (EventuallyConsistentMapEvent) o;
- return Objects.equals(this.type, that.type) &&
- Objects.equals(this.key, that.key) &&
- Objects.equals(this.value, that.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(type, key, value);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("type", type)
- .add("key", key)
- .add("value", value)
- .toString();
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 2987529..62d145d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -32,10 +32,13 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +57,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -93,8 +95,8 @@
private final ScheduledExecutorService backgroundExecutor;
private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
- private ExecutorService communicationExecutor;
- private Map<NodeId, EventAccumulator> senderPending;
+ private final ExecutorService communicationExecutor;
+ private final Map<NodeId, EventAccumulator> senderPending;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
@@ -103,130 +105,115 @@
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
- // TODO: Make these anti-entropy params configurable
- private long initialDelaySec = 5;
- private long periodSec = 5;
- private boolean lightweightAntiEntropy = true;
- private boolean tombstonesDisabled = false;
+ private final long initialDelaySec = 5;
+ private final boolean lightweightAntiEntropy;
+ private final boolean tombstonesDisabled;
private static final int WINDOW_SIZE = 5;
private static final int HIGH_LOAD_THRESHOLD = 0;
private static final int LOAD_WINDOW = 2;
- SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
- AtomicLong operations = new AtomicLong();
+ private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
- * Each map is identified by a string map name. EventuallyConsistentMapImpl
- * objects in different JVMs that use the same map name will form a
- * distributed map across JVMs (provided the cluster service is aware of
- * both nodes).
- * </p>
- * <p>
- * The client is expected to provide an
- * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
- * will be stored in this map have been registered (including referenced
- * classes). This serializer will be used to serialize both K and V for
- * inter-node notifications.
- * </p>
- * <p>
- * The client must provide an {@link org.onosproject.store.impl.ClockService}
- * which can generate timestamps for a given key. The clock service is free
- * to generate timestamps however it wishes, however these timestamps will
- * be used to serialize updates to the map so they must be strict enough
- * to ensure updates are properly ordered for the use case (i.e. in some
- * cases wallclock time will suffice, whereas in other cases logical time
- * will be necessary).
+ * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
+ * for more description of the parameters expected by the map.
* </p>
*
- * @param mapName a String identifier for the map.
- * @param clusterService the cluster service
- * @param clusterCommunicator the cluster communications service
- * @param serializerBuilder a Kryo namespace builder that can serialize
- * both K and V
- * @param clockService a clock service able to generate timestamps
- * for K
- * @param peerUpdateFunction function that provides a set of nodes to immediately
- * update to when there writes to the map
+ * @param mapName a String identifier for the map.
+ * @param clusterService the cluster service
+ * @param clusterCommunicator the cluster communications service
+ * @param serializerBuilder a Kryo namespace builder that can serialize
+ * both K and V
+ * @param clockService a clock service able to generate timestamps
+ * for K and V
+ * @param peerUpdateFunction function that provides a set of nodes to immediately
+ * update to when there writes to the map
+ * @param eventExecutor executor to use for processing incoming
+ * events from peers
+ * @param communicationExecutor executor to use for sending events to peers
+ * @param backgroundExecutor executor to use for background anti-entropy
+ * tasks
+ * @param tombstonesDisabled true if this map should not maintain
+ * tombstones
+ * @param antiEntropyPeriod period that the anti-entropy task should run
+ * in seconds
+ * @param convergeFaster make anti-entropy try to converge faster
*/
- public EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
- KryoNamespace.Builder serializerBuilder,
- ClockService<K, V> clockService,
- BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
- this.clusterService = checkNotNull(clusterService);
- this.clusterCommunicator = checkNotNull(clusterCommunicator);
- this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
-
- serializer = createSerializer(checkNotNull(serializerBuilder));
- destroyedMessage = mapName + ERROR_DESTROYED;
-
- this.clockService = checkNotNull(clockService);
-
+ EventuallyConsistentMapImpl(String mapName,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ KryoNamespace.Builder serializerBuilder,
+ ClockService<K, V> clockService,
+ BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
+ ExecutorService eventExecutor,
+ ExecutorService communicationExecutor,
+ ScheduledExecutorService backgroundExecutor,
+ boolean tombstonesDisabled,
+ long antiEntropyPeriod,
+ TimeUnit antiEntropyTimeUnit,
+ boolean convergeFaster) {
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
-
- // should be a normal executor; it's used for receiving messages
- //TODO make # of threads configurable
- executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
-
- // sending executor; should be capped
- //TODO make # of threads configurable
- //TODO this probably doesn't need to be bounded anymore
- communicationExecutor =
- newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
senderPending = Maps.newConcurrentMap();
+ destroyedMessage = mapName + ERROR_DESTROYED;
- backgroundExecutor =
- newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+ this.clusterService = clusterService;
+ this.clusterCommunicator = clusterCommunicator;
+
+ this.serializer = createSerializer(serializerBuilder);
+
+ this.clockService = clockService;
+
+ if (peerUpdateFunction != null) {
+ this.peerUpdateFunction = peerUpdateFunction;
+ } else {
+ this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+ .collect(Collectors.toList());
+ }
+
+ if (eventExecutor != null) {
+ this.executor = eventExecutor;
+ } else {
+ // should be a normal executor; it's used for receiving messages
+ this.executor =
+ Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+ }
+
+ if (communicationExecutor != null) {
+ this.communicationExecutor = communicationExecutor;
+ } else {
+ // sending executor; should be capped
+ //TODO this probably doesn't need to be bounded anymore
+ this.communicationExecutor =
+ newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+ }
+
+ if (backgroundExecutor != null) {
+ this.backgroundExecutor = backgroundExecutor;
+ } else {
+ this.backgroundExecutor =
+ newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+ }
// start anti-entropy thread
- backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
- initialDelaySec, periodSec,
- TimeUnit.SECONDS);
+ this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, antiEntropyPeriod,
+ antiEntropyTimeUnit);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
- new InternalEventListener(), executor);
+ new InternalEventListener(), this.executor);
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
- new InternalAntiEntropyListener(), backgroundExecutor);
- }
+ new InternalAntiEntropyListener(), this.backgroundExecutor);
- /**
- * Creates a new eventually consistent map shared amongst multiple instances.
- * <p>
- * Take a look at the other constructor for usage information. The only difference
- * is that a BiFunction is provided that returns all nodes in the cluster, so
- * all nodes will be sent write updates immediately.
- * </p>
- *
- * @param mapName a String identifier for the map.
- * @param clusterService the cluster service
- * @param clusterCommunicator the cluster communications service
- * @param serializerBuilder a Kryo namespace builder that can serialize
- * both K and V
- * @param clockService a clock service able to generate timestamps
- * for K
- */
- public EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
- KryoNamespace.Builder serializerBuilder,
- ClockService<K, V> clockService) {
- this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
- (key, value) -> clusterService.getNodes().stream()
- .map(ControllerNode::id)
- .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
- .collect(Collectors.toList()));
- }
-
- public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
- tombstonesDisabled = status;
- return this;
+ this.tombstonesDisabled = tombstonesDisabled;
+ this.lightweightAntiEntropy = !convergeFaster;
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -246,19 +233,6 @@
};
}
- /**
- * Sets the executor to use for broadcasting messages and returns this
- * instance for method chaining.
- *
- * @param executor executor service
- * @return this instance
- */
- public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
- checkNotNull(executor, "Null executor");
- communicationExecutor = executor;
- return this;
- }
-
@Override
public int size() {
checkState(!destroyed, destroyedMessage);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java
deleted file mode 100644
index 52642a1..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-/**
- * Listener interested in receiving modification events for an
- * EventuallyConsistentMap.
- */
-public interface EventuallyConsistentMapListener<K, V> {
-
- /**
- * Reacts to the specified event.
- *
- * @param event the event
- */
- public void event(EventuallyConsistentMapEvent<K, V> event);
-}