Implemented a Builder pattern for EventuallyConsistentMaps.

EventuallyConsistentMap has been moved to the API package so is now available outside the stores.

ONOS-1357

Change-Id: I1c892eb3dbefa72cb3f3eb3ccc74e9a02c7e2ac9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 2987529..62d145d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -32,10 +32,13 @@
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.ClockService;
 import org.onosproject.store.impl.Timestamped;
 import org.onosproject.store.impl.WallClockTimestamp;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +57,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
@@ -93,8 +95,8 @@
     private final ScheduledExecutorService backgroundExecutor;
     private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
 
-    private ExecutorService communicationExecutor;
-    private Map<NodeId, EventAccumulator> senderPending;
+    private final ExecutorService communicationExecutor;
+    private final Map<NodeId, EventAccumulator> senderPending;
 
     private volatile boolean destroyed = false;
     private static final String ERROR_DESTROYED = " map is already destroyed";
@@ -103,130 +105,115 @@
     private static final String ERROR_NULL_KEY = "Key cannot be null";
     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
 
-    // TODO: Make these anti-entropy params configurable
-    private long initialDelaySec = 5;
-    private long periodSec = 5;
-    private boolean lightweightAntiEntropy = true;
-    private boolean tombstonesDisabled = false;
+    private final long initialDelaySec = 5;
+    private final boolean lightweightAntiEntropy;
+    private final boolean tombstonesDisabled;
 
     private static final int WINDOW_SIZE = 5;
     private static final int HIGH_LOAD_THRESHOLD = 0;
     private static final int LOAD_WINDOW = 2;
-    SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
-    AtomicLong operations = new AtomicLong();
+    private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
 
     /**
      * Creates a new eventually consistent map shared amongst multiple instances.
      * <p>
-     * Each map is identified by a string map name. EventuallyConsistentMapImpl
-     * objects in different JVMs that use the same map name will form a
-     * distributed map across JVMs (provided the cluster service is aware of
-     * both nodes).
-     * </p>
-     * <p>
-     * The client is expected to provide an
-     * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
-     * will be stored in this map have been registered (including referenced
-     * classes). This serializer will be used to serialize both K and V for
-     * inter-node notifications.
-     * </p>
-     * <p>
-     * The client must provide an {@link org.onosproject.store.impl.ClockService}
-     * which can generate timestamps for a given key. The clock service is free
-     * to generate timestamps however it wishes, however these timestamps will
-     * be used to serialize updates to the map so they must be strict enough
-     * to ensure updates are properly ordered for the use case (i.e. in some
-     * cases wallclock time will suffice, whereas in other cases logical time
-     * will be necessary).
+     * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
+     * for more description of the parameters expected by the map.
      * </p>
      *
-     * @param mapName             a String identifier for the map.
-     * @param clusterService      the cluster service
-     * @param clusterCommunicator the cluster communications service
-     * @param serializerBuilder   a Kryo namespace builder that can serialize
-     *                            both K and V
-     * @param clockService        a clock service able to generate timestamps
-     *                            for K
-     * @param peerUpdateFunction  function that provides a set of nodes to immediately
-     *                            update to when there writes to the map
+     * @param mapName               a String identifier for the map.
+     * @param clusterService        the cluster service
+     * @param clusterCommunicator   the cluster communications service
+     * @param serializerBuilder     a Kryo namespace builder that can serialize
+     *                              both K and V
+     * @param clockService          a clock service able to generate timestamps
+     *                              for K and V
+     * @param peerUpdateFunction    function that provides a set of nodes to immediately
+     *                              update to when there writes to the map
+     * @param eventExecutor         executor to use for processing incoming
+     *                              events from peers
+     * @param communicationExecutor executor to use for sending events to peers
+     * @param backgroundExecutor    executor to use for background anti-entropy
+     *                              tasks
+     * @param tombstonesDisabled    true if this map should not maintain
+     *                              tombstones
+     * @param antiEntropyPeriod     period that the anti-entropy task should run
+     *                              in seconds
+     * @param convergeFaster        make anti-entropy try to converge faster
      */
-    public EventuallyConsistentMapImpl(String mapName,
-                                       ClusterService clusterService,
-                                       ClusterCommunicationService clusterCommunicator,
-                                       KryoNamespace.Builder serializerBuilder,
-                                       ClockService<K, V> clockService,
-                                       BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
-        this.clusterService = checkNotNull(clusterService);
-        this.clusterCommunicator = checkNotNull(clusterCommunicator);
-        this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
-
-        serializer = createSerializer(checkNotNull(serializerBuilder));
-        destroyedMessage = mapName + ERROR_DESTROYED;
-
-        this.clockService = checkNotNull(clockService);
-
+    EventuallyConsistentMapImpl(String mapName,
+                                ClusterService clusterService,
+                                ClusterCommunicationService clusterCommunicator,
+                                KryoNamespace.Builder serializerBuilder,
+                                ClockService<K, V> clockService,
+                                BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
+                                ExecutorService eventExecutor,
+                                ExecutorService communicationExecutor,
+                                ScheduledExecutorService backgroundExecutor,
+                                boolean tombstonesDisabled,
+                                long antiEntropyPeriod,
+                                TimeUnit antiEntropyTimeUnit,
+                                boolean convergeFaster) {
         items = new ConcurrentHashMap<>();
         removedItems = new ConcurrentHashMap<>();
-
-        // should be a normal executor; it's used for receiving messages
-        //TODO make # of threads configurable
-        executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
-
-        // sending executor; should be capped
-        //TODO make # of threads configurable
-        //TODO this probably doesn't need to be bounded anymore
-        communicationExecutor =
-                newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
         senderPending = Maps.newConcurrentMap();
+        destroyedMessage = mapName + ERROR_DESTROYED;
 
-        backgroundExecutor =
-                newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+        this.clusterService = clusterService;
+        this.clusterCommunicator = clusterCommunicator;
+
+        this.serializer = createSerializer(serializerBuilder);
+
+        this.clockService = clockService;
+
+        if (peerUpdateFunction != null) {
+            this.peerUpdateFunction = peerUpdateFunction;
+        } else {
+            this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
+                    .map(ControllerNode::id)
+                    .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+                    .collect(Collectors.toList());
+        }
+
+        if (eventExecutor != null) {
+            this.executor = eventExecutor;
+        } else {
+            // should be a normal executor; it's used for receiving messages
+            this.executor =
+                    Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+        }
+
+        if (communicationExecutor != null) {
+            this.communicationExecutor = communicationExecutor;
+        } else {
+            // sending executor; should be capped
+            //TODO this probably doesn't need to be bounded anymore
+            this.communicationExecutor =
+                    newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+        }
+
+        if (backgroundExecutor != null) {
+            this.backgroundExecutor = backgroundExecutor;
+        } else {
+            this.backgroundExecutor =
+                    newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+        }
 
         // start anti-entropy thread
-        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
-                                               initialDelaySec, periodSec,
-                                               TimeUnit.SECONDS);
+        this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+                                                    initialDelaySec, antiEntropyPeriod,
+                                                    antiEntropyTimeUnit);
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          new InternalEventListener(), executor);
+                                          new InternalEventListener(), this.executor);
 
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
-                                          new InternalAntiEntropyListener(), backgroundExecutor);
-    }
+                                          new InternalAntiEntropyListener(), this.backgroundExecutor);
 
-    /**
-     * Creates a new eventually consistent map shared amongst multiple instances.
-     * <p>
-     * Take a look at the other constructor for usage information. The only difference
-     * is that a BiFunction is provided that returns all nodes in the cluster, so
-     * all nodes will be sent write updates immediately.
-     * </p>
-     *
-     * @param mapName             a String identifier for the map.
-     * @param clusterService      the cluster service
-     * @param clusterCommunicator the cluster communications service
-     * @param serializerBuilder   a Kryo namespace builder that can serialize
-     *                            both K and V
-     * @param clockService        a clock service able to generate timestamps
-     *                            for K
-     */
-    public EventuallyConsistentMapImpl(String mapName,
-                                       ClusterService clusterService,
-                                       ClusterCommunicationService clusterCommunicator,
-                                       KryoNamespace.Builder serializerBuilder,
-                                       ClockService<K, V> clockService) {
-        this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
-             (key, value) -> clusterService.getNodes().stream()
-                     .map(ControllerNode::id)
-                     .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
-                     .collect(Collectors.toList()));
-    }
-
-    public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
-        tombstonesDisabled = status;
-        return this;
+        this.tombstonesDisabled = tombstonesDisabled;
+        this.lightweightAntiEntropy = !convergeFaster;
     }
 
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -246,19 +233,6 @@
         };
     }
 
-    /**
-     * Sets the executor to use for broadcasting messages and returns this
-     * instance for method chaining.
-     *
-     * @param executor executor service
-     * @return this instance
-     */
-    public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
-        checkNotNull(executor, "Null executor");
-        communicationExecutor = executor;
-        return this;
-    }
-
     @Override
     public int size() {
         checkState(!destroyed, destroyedMessage);