Implemented a Builder pattern for EventuallyConsistentMaps.

EventuallyConsistentMap has been moved to the API package so is now available outside the stores.

ONOS-1357

Change-Id: I1c892eb3dbefa72cb3f3eb3ccc74e9a02c7e2ac9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java
deleted file mode 100644
index 8cda45d..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A distributed, eventually consistent map.
- * <p>
- * This map does not offer read after writes consistency. Operations are
- * serialized via the timestamps issued by the clock service. If two updates
- * are in conflict, the update with the more recent timestamp will endure.
- * </p><p>
- * The interface is mostly similar to {@link java.util.Map} with some minor
- * semantic changes and the addition of a listener framework (because the map
- * can be mutated by clients on other instances, not only through the local Java
- * API).
- * </p><p>
- * Clients are expected to register an
- * {@link EventuallyConsistentMapListener} if they
- * are interested in receiving notifications of update to the map.
- * </p><p>
- * Null values are not allowed in this map.
- * </p>
- */
-public interface EventuallyConsistentMap<K, V> {
-
-    /**
-     * Returns the number of key-value mappings in this map.
-     *
-     * @return number of key-value mappings
-     */
-    public int size();
-
-    /**
-     * Returns true if this map is empty.
-     *
-     * @return true if this map is empty, otherwise false
-     */
-    public boolean isEmpty();
-
-    /**
-     * Returns true if the map contains a mapping for the specified key.
-     *
-     * @param key the key to check if this map contains
-     * @return true if this map has a mapping for the key, otherwise false
-     */
-    public boolean containsKey(K key);
-
-    /**
-     * Returns true if the map contains a mapping from any key to the specified
-     * value.
-     *
-     * @param value the value to check if this map has a mapping for
-     * @return true if this map has a mapping to this value, otherwise false
-     */
-    public boolean containsValue(V value);
-
-    /**
-     * Returns the value mapped to the specified key.
-     *
-     * @param key the key to look up in this map
-     * @return the value mapped to the key, or null if no mapping is found
-     */
-    public V get(K key);
-
-    /**
-     * Associates the specified value to the specified key in this map.
-     * <p>
-     * Note: this differs from the specification of {@link java.util.Map}
-     * because it does not return the previous value associated with the key.
-     * Clients are expected to register an
-     * {@link EventuallyConsistentMapListener} if
-     * they are interested in receiving notification of updates to the map.
-     * </p><p>
-     * Null values are not allowed in the map.
-     * </p>
-     *
-     * @param key the key to add a mapping for in this map
-     * @param value the value to associate with the key in this map
-     */
-    public void put(K key, V value);
-
-    /**
-     * Removes the mapping associated with the specified key from the map.
-     * <p>
-     * Note: this differs from the specification of {@link java.util.Map}
-     * because it does not return the previous value associated with the key.
-     * Clients are expected to register an
-     * {@link EventuallyConsistentMapListener} if
-     * they are interested in receiving notification of updates to the map.
-     * </p>
-     *
-     * @param key the key to remove the mapping for
-     */
-    public void remove(K key);
-
-    /**
-     * Removes the given key-value mapping from the map, if it exists.
-     * <p>
-     * This actually means remove any values up to and including the timestamp
-     * given by {@link org.onosproject.store.impl.ClockService#getTimestamp(Object, Object)}.
-     * Any mappings that produce an earlier timestamp than this given key-value
-     * pair will be removed, and any mappings that produce a later timestamp
-     * will supersede this remove.
-     * </p><p>
-     * Note: this differs from the specification of {@link java.util.Map}
-     * because it does not return a boolean indication whether a value was removed.
-     * Clients are expected to register an
-     * {@link EventuallyConsistentMapListener} if
-     * they are interested in receiving notification of updates to the map.
-     * </p>
-     *
-     * @param key the key to remove the mapping for
-     * @param value the value mapped to the key
-     */
-    public void remove(K key, V value);
-
-    /**
-     * Adds mappings for all key-value pairs in the specified map to this map.
-     * <p>
-     * This will be more efficient in communication than calling individual put
-     * operations.
-     * </p>
-     *
-     * @param m a map of values to add to this map
-     */
-    public void putAll(Map<? extends K, ? extends V> m);
-
-    /**
-     * Removes all mappings from this map.
-     */
-    public void clear();
-
-    /**
-     * Returns a set of the keys in this map. Changes to the set are not
-     * reflected back to the map.
-     *
-     * @return set of keys in the map
-     */
-    public Set<K> keySet();
-
-    /**
-     * Returns a collections of values in this map. Changes to the collection
-     * are not reflected back to the map.
-     *
-     * @return collection of values in the map
-     */
-    public Collection<V> values();
-
-    /**
-     * Returns a set of mappings contained in this map. Changes to the set are
-     * not reflected back to the map.
-     *
-     * @return set of key-value mappings in this map
-     */
-    public Set<Map.Entry<K, V>> entrySet();
-
-    /**
-     * Adds the specified listener to the map which will be notified whenever
-     * the mappings in the map are changed.
-     *
-     * @param listener listener to register for events
-     */
-    public void addListener(EventuallyConsistentMapListener<K, V> listener);
-
-    /**
-     * Removes the specified listener from the map such that it will no longer
-     * receive change notifications.
-     *
-     * @param listener listener to deregister for events
-     */
-    public void removeListener(EventuallyConsistentMapListener<K, V> listener);
-
-    /**
-     * Shuts down the map and breaks communication between different instances.
-     * This allows the map objects to be cleaned up and garbage collected.
-     * Calls to any methods on the map subsequent to calling destroy() will
-     * throw a {@link java.lang.RuntimeException}.
-     */
-    public void destroy();
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
new file mode 100644
index 0000000..9d60143
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.ecmap;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Eventually consistent map builder.
+ */
+public class EventuallyConsistentMapBuilderImpl<K, V>
+        implements EventuallyConsistentMapBuilder<K, V> {
+    private final ClusterService clusterService;
+    private final ClusterCommunicationService clusterCommunicator;
+
+    private String name;
+    private KryoNamespace.Builder serializerBuilder;
+    private ExecutorService eventExecutor;
+    private ExecutorService communicationExecutor;
+    private ScheduledExecutorService backgroundExecutor;
+    private ClockService<K, V> clockService;
+    private BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
+    private boolean tombstonesDisabled = false;
+    private long antiEntropyPeriod = 5;
+    private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
+    private boolean convergeFaster = false;
+
+    /**
+     * Creates a new eventually consistent map builder.
+     *
+     * @param clusterService cluster service
+     * @param clusterCommunicator cluster communication service
+     */
+    public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
+                                              ClusterCommunicationService clusterCommunicator) {
+        this.clusterService = checkNotNull(clusterService);
+        this.clusterCommunicator = checkNotNull(clusterCommunicator);
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withName(String name) {
+        this.name = checkNotNull(name);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withSerializer(
+            KryoNamespace.Builder serializerBuilder) {
+        this.serializerBuilder = checkNotNull(serializerBuilder);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withClockService(
+            ClockService<K, V> clockService) {
+        this.clockService = checkNotNull(clockService);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withEventExecutor(ExecutorService executor) {
+        this.eventExecutor = checkNotNull(executor);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder<K, V> withCommunicationExecutor(
+            ExecutorService executor) {
+        communicationExecutor = checkNotNull(executor);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withBackgroundExecutor(ScheduledExecutorService executor) {
+        this.backgroundExecutor = checkNotNull(executor);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withPeerUpdateFunction(
+            BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
+        this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder<K, V> withTombstonesDisabled() {
+        tombstonesDisabled = true;
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder<K, V> withAntiEntropyPeriod(long period, TimeUnit unit) {
+        checkArgument(period > 0, "anti-entropy period must be greater than 0");
+        antiEntropyPeriod = period;
+        antiEntropyTimeUnit = checkNotNull(unit);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder<K, V> withFasterConvergence() {
+        convergeFaster = true;
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMap<K, V> build() {
+        checkNotNull(name, "name is a mandatory parameter");
+        checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
+        checkNotNull(clockService, "clockService is a mandatory parameter");
+
+        return new EventuallyConsistentMapImpl<>(name,
+                                                 clusterService,
+                                                 clusterCommunicator,
+                                                 serializerBuilder,
+                                                 clockService,
+                                                 peerUpdateFunction,
+                                                 eventExecutor,
+                                                 communicationExecutor,
+                                                 backgroundExecutor,
+                                                 tombstonesDisabled,
+                                                 antiEntropyPeriod,
+                                                 antiEntropyTimeUnit,
+                                                 convergeFaster);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java
deleted file mode 100644
index ee7d42e..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Objects;
-
-/**
- * Event object signalling that the map was modified.
- */
-public class EventuallyConsistentMapEvent<K, V> {
-
-    public enum Type {
-        PUT,
-        REMOVE
-    }
-
-    private final Type type;
-    private final K key;
-    private final V value;
-
-    /**
-     * Creates a new event object.
-     *
-     * @param type the type of the event
-     * @param key the key the event concerns
-     * @param value the value related to the key, or null for remove events
-     */
-    public EventuallyConsistentMapEvent(Type type, K key, V value) {
-        this.type = type;
-        this.key = key;
-        this.value = value;
-    }
-
-    /**
-     * Returns the type of the event.
-     *
-     * @return the type of the event
-     */
-    public Type type() {
-        return type;
-    }
-
-    /**
-     * Returns the key this event concerns.
-     *
-     * @return the key
-     */
-    public K key() {
-        return key;
-    }
-
-    /**
-     * Returns the value associated with this event.
-     *
-     * @return the value, or null if the event was REMOVE
-     */
-    public V value() {
-        return value;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof EventuallyConsistentMapEvent)) {
-            return false;
-        }
-
-        EventuallyConsistentMapEvent that = (EventuallyConsistentMapEvent) o;
-        return Objects.equals(this.type, that.type) &&
-                Objects.equals(this.key, that.key) &&
-                Objects.equals(this.value, that.value);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(type, key, value);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("type", type)
-                .add("key", key)
-                .add("value", value)
-                .toString();
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 2987529..62d145d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -32,10 +32,13 @@
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.ClockService;
 import org.onosproject.store.impl.Timestamped;
 import org.onosproject.store.impl.WallClockTimestamp;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +57,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
@@ -93,8 +95,8 @@
     private final ScheduledExecutorService backgroundExecutor;
     private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
 
-    private ExecutorService communicationExecutor;
-    private Map<NodeId, EventAccumulator> senderPending;
+    private final ExecutorService communicationExecutor;
+    private final Map<NodeId, EventAccumulator> senderPending;
 
     private volatile boolean destroyed = false;
     private static final String ERROR_DESTROYED = " map is already destroyed";
@@ -103,130 +105,115 @@
     private static final String ERROR_NULL_KEY = "Key cannot be null";
     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
 
-    // TODO: Make these anti-entropy params configurable
-    private long initialDelaySec = 5;
-    private long periodSec = 5;
-    private boolean lightweightAntiEntropy = true;
-    private boolean tombstonesDisabled = false;
+    private final long initialDelaySec = 5;
+    private final boolean lightweightAntiEntropy;
+    private final boolean tombstonesDisabled;
 
     private static final int WINDOW_SIZE = 5;
     private static final int HIGH_LOAD_THRESHOLD = 0;
     private static final int LOAD_WINDOW = 2;
-    SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
-    AtomicLong operations = new AtomicLong();
+    private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
 
     /**
      * Creates a new eventually consistent map shared amongst multiple instances.
      * <p>
-     * Each map is identified by a string map name. EventuallyConsistentMapImpl
-     * objects in different JVMs that use the same map name will form a
-     * distributed map across JVMs (provided the cluster service is aware of
-     * both nodes).
-     * </p>
-     * <p>
-     * The client is expected to provide an
-     * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
-     * will be stored in this map have been registered (including referenced
-     * classes). This serializer will be used to serialize both K and V for
-     * inter-node notifications.
-     * </p>
-     * <p>
-     * The client must provide an {@link org.onosproject.store.impl.ClockService}
-     * which can generate timestamps for a given key. The clock service is free
-     * to generate timestamps however it wishes, however these timestamps will
-     * be used to serialize updates to the map so they must be strict enough
-     * to ensure updates are properly ordered for the use case (i.e. in some
-     * cases wallclock time will suffice, whereas in other cases logical time
-     * will be necessary).
+     * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
+     * for more description of the parameters expected by the map.
      * </p>
      *
-     * @param mapName             a String identifier for the map.
-     * @param clusterService      the cluster service
-     * @param clusterCommunicator the cluster communications service
-     * @param serializerBuilder   a Kryo namespace builder that can serialize
-     *                            both K and V
-     * @param clockService        a clock service able to generate timestamps
-     *                            for K
-     * @param peerUpdateFunction  function that provides a set of nodes to immediately
-     *                            update to when there writes to the map
+     * @param mapName               a String identifier for the map.
+     * @param clusterService        the cluster service
+     * @param clusterCommunicator   the cluster communications service
+     * @param serializerBuilder     a Kryo namespace builder that can serialize
+     *                              both K and V
+     * @param clockService          a clock service able to generate timestamps
+     *                              for K and V
+     * @param peerUpdateFunction    function that provides a set of nodes to immediately
+     *                              update to when there writes to the map
+     * @param eventExecutor         executor to use for processing incoming
+     *                              events from peers
+     * @param communicationExecutor executor to use for sending events to peers
+     * @param backgroundExecutor    executor to use for background anti-entropy
+     *                              tasks
+     * @param tombstonesDisabled    true if this map should not maintain
+     *                              tombstones
+     * @param antiEntropyPeriod     period that the anti-entropy task should run
+     *                              in seconds
+     * @param convergeFaster        make anti-entropy try to converge faster
      */
-    public EventuallyConsistentMapImpl(String mapName,
-                                       ClusterService clusterService,
-                                       ClusterCommunicationService clusterCommunicator,
-                                       KryoNamespace.Builder serializerBuilder,
-                                       ClockService<K, V> clockService,
-                                       BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
-        this.clusterService = checkNotNull(clusterService);
-        this.clusterCommunicator = checkNotNull(clusterCommunicator);
-        this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
-
-        serializer = createSerializer(checkNotNull(serializerBuilder));
-        destroyedMessage = mapName + ERROR_DESTROYED;
-
-        this.clockService = checkNotNull(clockService);
-
+    EventuallyConsistentMapImpl(String mapName,
+                                ClusterService clusterService,
+                                ClusterCommunicationService clusterCommunicator,
+                                KryoNamespace.Builder serializerBuilder,
+                                ClockService<K, V> clockService,
+                                BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
+                                ExecutorService eventExecutor,
+                                ExecutorService communicationExecutor,
+                                ScheduledExecutorService backgroundExecutor,
+                                boolean tombstonesDisabled,
+                                long antiEntropyPeriod,
+                                TimeUnit antiEntropyTimeUnit,
+                                boolean convergeFaster) {
         items = new ConcurrentHashMap<>();
         removedItems = new ConcurrentHashMap<>();
-
-        // should be a normal executor; it's used for receiving messages
-        //TODO make # of threads configurable
-        executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
-
-        // sending executor; should be capped
-        //TODO make # of threads configurable
-        //TODO this probably doesn't need to be bounded anymore
-        communicationExecutor =
-                newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
         senderPending = Maps.newConcurrentMap();
+        destroyedMessage = mapName + ERROR_DESTROYED;
 
-        backgroundExecutor =
-                newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+        this.clusterService = clusterService;
+        this.clusterCommunicator = clusterCommunicator;
+
+        this.serializer = createSerializer(serializerBuilder);
+
+        this.clockService = clockService;
+
+        if (peerUpdateFunction != null) {
+            this.peerUpdateFunction = peerUpdateFunction;
+        } else {
+            this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
+                    .map(ControllerNode::id)
+                    .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+                    .collect(Collectors.toList());
+        }
+
+        if (eventExecutor != null) {
+            this.executor = eventExecutor;
+        } else {
+            // should be a normal executor; it's used for receiving messages
+            this.executor =
+                    Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+        }
+
+        if (communicationExecutor != null) {
+            this.communicationExecutor = communicationExecutor;
+        } else {
+            // sending executor; should be capped
+            //TODO this probably doesn't need to be bounded anymore
+            this.communicationExecutor =
+                    newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+        }
+
+        if (backgroundExecutor != null) {
+            this.backgroundExecutor = backgroundExecutor;
+        } else {
+            this.backgroundExecutor =
+                    newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+        }
 
         // start anti-entropy thread
-        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
-                                               initialDelaySec, periodSec,
-                                               TimeUnit.SECONDS);
+        this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+                                                    initialDelaySec, antiEntropyPeriod,
+                                                    antiEntropyTimeUnit);
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          new InternalEventListener(), executor);
+                                          new InternalEventListener(), this.executor);
 
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
-                                          new InternalAntiEntropyListener(), backgroundExecutor);
-    }
+                                          new InternalAntiEntropyListener(), this.backgroundExecutor);
 
-    /**
-     * Creates a new eventually consistent map shared amongst multiple instances.
-     * <p>
-     * Take a look at the other constructor for usage information. The only difference
-     * is that a BiFunction is provided that returns all nodes in the cluster, so
-     * all nodes will be sent write updates immediately.
-     * </p>
-     *
-     * @param mapName             a String identifier for the map.
-     * @param clusterService      the cluster service
-     * @param clusterCommunicator the cluster communications service
-     * @param serializerBuilder   a Kryo namespace builder that can serialize
-     *                            both K and V
-     * @param clockService        a clock service able to generate timestamps
-     *                            for K
-     */
-    public EventuallyConsistentMapImpl(String mapName,
-                                       ClusterService clusterService,
-                                       ClusterCommunicationService clusterCommunicator,
-                                       KryoNamespace.Builder serializerBuilder,
-                                       ClockService<K, V> clockService) {
-        this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
-             (key, value) -> clusterService.getNodes().stream()
-                     .map(ControllerNode::id)
-                     .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
-                     .collect(Collectors.toList()));
-    }
-
-    public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
-        tombstonesDisabled = status;
-        return this;
+        this.tombstonesDisabled = tombstonesDisabled;
+        this.lightweightAntiEntropy = !convergeFaster;
     }
 
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -246,19 +233,6 @@
         };
     }
 
-    /**
-     * Sets the executor to use for broadcasting messages and returns this
-     * instance for method chaining.
-     *
-     * @param executor executor service
-     * @return this instance
-     */
-    public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
-        checkNotNull(executor, "Null executor");
-        communicationExecutor = executor;
-        return this;
-    }
-
     @Override
     public int size() {
         checkState(!destroyed, destroyedMessage);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java
deleted file mode 100644
index 52642a1..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-/**
- * Listener interested in receiving modification events for an
- * EventuallyConsistentMap.
- */
-public interface EventuallyConsistentMapListener<K, V> {
-
-    /**
-     * Reacts to the specified event.
-     *
-     * @param event the event
-     */
-    public void event(EventuallyConsistentMapEvent<K, V> event);
-}