Implemented a Builder pattern for EventuallyConsistentMaps.

EventuallyConsistentMap has been moved to the API package so is now available outside the stores.

ONOS-1357

Change-Id: I1c892eb3dbefa72cb3f3eb3ccc74e9a02c7e2ac9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java b/core/api/src/main/java/org/onosproject/store/service/ClockService.java
similarity index 96%
rename from core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java
rename to core/api/src/main/java/org/onosproject/store/service/ClockService.java
index 3ed89f8..a0de0a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ClockService.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.store.impl;
+package org.onosproject.store.service;
 
 import org.onosproject.store.Timestamp;
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
similarity index 97%
rename from core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java
rename to core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
index 8cda45d..58a81cc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.store.ecmap;
+package org.onosproject.store.service;
 
 import java.util.Collection;
 import java.util.Map;
@@ -114,7 +114,7 @@
      * Removes the given key-value mapping from the map, if it exists.
      * <p>
      * This actually means remove any values up to and including the timestamp
-     * given by {@link org.onosproject.store.impl.ClockService#getTimestamp(Object, Object)}.
+     * given by {@link org.onosproject.store.service.ClockService#getTimestamp(Object, Object)}.
      * Any mappings that produce an earlier timestamp than this given key-value
      * pair will be removed, and any mappings that produce a later timestamp
      * will supersede this remove.
diff --git a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
new file mode 100644
index 0000000..779c329
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+/**
+ * Builder for eventually consistent maps.
+ *
+ * @param <K> type for map keys
+ * @param <V> type for map values
+ */
+public interface EventuallyConsistentMapBuilder<K, V> {
+
+    /**
+     * Sets the name of the map.
+     * <p>
+     * Each map is identified by a string map name. EventuallyConsistentMapImpl
+     * objects in different JVMs that use the same map name will form a
+     * distributed map across JVMs (provided the cluster service is aware of
+     * both nodes).
+     * </p>
+     * <p>
+     * Note: This is a mandatory parameter.
+     * </p>
+     *
+     * @param name name of the map
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withName(String name);
+
+    /**
+     * Sets a serializer builder that can be used to create a serializer that
+     * can serialize both the keys and values put into the map. The serializer
+     * builder should be pre-populated with any classes that will be put into
+     * the map.
+     * <p>
+     * Note: This is a mandatory parameter.
+     * </p>
+     *
+     * @param serializerBuilder serializer builder
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withSerializer(
+            KryoNamespace.Builder serializerBuilder);
+
+    /**
+     * Sets the clock service to use for generating timestamps for map updates.
+     * <p>
+     * The client must provide an {@link org.onosproject.store.service.ClockService}
+     * which can generate timestamps for a given key. The clock service is free
+     * to generate timestamps however it wishes, however these timestamps will
+     * be used to serialize updates to the map so they must be strict enough
+     * to ensure updates are properly ordered for the use case (i.e. in some
+     * cases wallclock time will suffice, whereas in other cases logical time
+     * will be necessary).
+     * </p>
+     * <p>
+     * Note: This is a mandatory parameter.
+     * </p>
+     *
+     * @param clockService clock service
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withClockService(
+            ClockService<K, V> clockService);
+
+    /**
+     * Sets the executor to use for processing events coming in from peers.
+     *
+     * @param executor event executor
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withEventExecutor(
+            ExecutorService executor);
+
+    /**
+     * Sets the executor to use for sending events to peers.
+     *
+     * @param executor event executor
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withCommunicationExecutor(
+            ExecutorService executor);
+
+    /**
+     * Sets the executor to use for background anti-entropy tasks.
+     *
+     * @param executor event executor
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withBackgroundExecutor(
+            ScheduledExecutorService executor);
+
+    /**
+     * Sets a function that can determine which peers to replicate updates to.
+     * <p>
+     * The default function replicates to all nodes.
+     * </p>
+     *
+     * @param peerUpdateFunction function that takes a K, V input and returns
+     *                           a collection of NodeIds to replicate the event
+     *                           to
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withPeerUpdateFunction(
+            BiFunction<K, V, Collection<NodeId>> peerUpdateFunction);
+
+    /**
+     * Prevents this map from writing tombstones of items that have been
+     * removed. This may result in zombie items reappearing after they have
+     * been removed.
+     * <p>
+     * The default behavior is tombstones are enabled.
+     * </p>
+     *
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withTombstonesDisabled();
+
+    /**
+     * Configures how often to run the anti-entropy background task.
+     * <p>
+     * The default anti-entropy period is 5 seconds.
+     * </p>
+     *
+     * @param period anti-entropy period
+     * @param unit time unit for the period
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withAntiEntropyPeriod(
+            long period, TimeUnit unit);
+
+    /**
+     * Configure anti-entropy to converge faster at the cost of doing more work
+     * for each anti-entropy cycle. Suited to maps with low update rate where
+     * convergence time is more important than throughput.
+     * <p>
+     * The default behavior is to do less anti-entropy work at the cost of
+     * slower convergence.
+     * </p>
+     *
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withFasterConvergence();
+
+    /**
+     * Builds an eventually consistent map based on the configuration options
+     * supplied to this builder.
+     *
+     * @return new eventually consistent map
+     * @throws java.lang.RuntimeException if a mandatory parameter is missing
+     */
+    public EventuallyConsistentMap<K, V> build();
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapEvent.java
similarity index 98%
rename from core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java
rename to core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapEvent.java
index ee7d42e..8f63325 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapEvent.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.store.ecmap;
+package org.onosproject.store.service;
 
 import com.google.common.base.MoreObjects;
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapListener.java
similarity index 95%
rename from core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java
rename to core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapListener.java
index 52642a1..686255e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapListener.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.store.ecmap;
+package org.onosproject.store.service;
 
 /**
  * Listener interested in receiving modification events for an
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index 7e447cc..a59e376 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -30,6 +30,7 @@
 
     /**
      * Creates a ConsistentMap.
+     *
      * @param name map name
      * @param serializer serializer to use for serializing keys and values
      * @return consistent map.
@@ -40,6 +41,7 @@
 
     /**
      * Creates a AsyncConsistentMap.
+     *
      * @param name map name
      * @param serializer serializer to use for serializing keys and values
      * @return async consistent map
@@ -50,7 +52,18 @@
 
     /**
      * Creates a new transaction context.
+     *
      * @return transaction context
      */
     TransactionContext createTransactionContext();
-}
\ No newline at end of file
+
+    /**
+     * Creates a new EventuallyConsistentMapBuilder.
+     *
+     * @param <K> key type
+     * @param <V> value type
+     * @return builder for an eventually consistent map
+     */
+    <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 935561d..1279c0e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -43,14 +43,14 @@
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
-import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
-import org.onosproject.store.impl.ClockService;
 import org.onosproject.store.impl.MultiValuedTimestamp;
 import org.onosproject.store.impl.WallclockClockManager;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import java.io.ByteArrayInputStream;
@@ -66,10 +66,16 @@
 import static com.google.common.io.ByteStreams.toByteArray;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.app.ApplicationEvent.Type.*;
-import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
-import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
+import static org.onosproject.app.ApplicationEvent.Type.APP_ACTIVATED;
+import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
+import static org.onosproject.app.ApplicationEvent.Type.APP_INSTALLED;
+import static org.onosproject.app.ApplicationEvent.Type.APP_PERMISSIONS_CHANGED;
+import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
+import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
+import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
+import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -106,6 +112,9 @@
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ApplicationIdStore idStore;
 
     private final AtomicLong sequence = new AtomicLong();
@@ -130,27 +139,29 @@
                 new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
                                            sequence.incrementAndGet());
 
-        apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
-                                                 clusterCommunicator,
-                                                 serializer,
-                                                 appsClockService);
+        apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
+                .withName("apps")
+                .withSerializer(serializer)
+                .withClockService(appsClockService)
+                .build();
 
         ClockService<Application, InternalState> statesClockService = (app, state) ->
                 new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
                                            sequence.incrementAndGet());
 
-        states = new EventuallyConsistentMapImpl<>("app-states",
-                                                   clusterService,
-                                                   clusterCommunicator,
-                                                   serializer,
-                                                   statesClockService);
+        states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
+                .withName("app-states")
+                .withSerializer(serializer)
+                .withClockService(statesClockService)
+                .build();
+
         states.addListener(new InternalAppStatesListener());
 
-        permissions = new EventuallyConsistentMapImpl<>("app-permissions",
-                                                        clusterService,
-                                                        clusterCommunicator,
-                                                        serializer,
-                                                        new WallclockClockManager<>());
+        permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
+                .withName("app-permissions")
+                .withSerializer(serializer)
+                .withClockService(new WallclockClockManager<>())
+                .build();
 
         log.info("Started");
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java b/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
index b70a291..94e17b1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
@@ -25,21 +25,19 @@
 import org.onosproject.cfg.ComponentConfigEvent;
 import org.onosproject.cfg.ComponentConfigStore;
 import org.onosproject.cfg.ComponentConfigStoreDelegate;
-import org.onosproject.cluster.ClusterService;
 import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
-import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
 import org.onosproject.store.impl.WallclockClockManager;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_SET;
 import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_UNSET;
-import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -59,20 +57,19 @@
     private EventuallyConsistentMap<String, String> properties;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
+    protected StorageService storageService;
 
     @Activate
     public void activate() {
         KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API);
 
-        properties = new EventuallyConsistentMapImpl<>("cfg", clusterService,
-                                                       clusterCommunicator,
-                                                       serializer,
-                                                       new WallclockClockManager<>());
+        properties = storageService.<String, String>eventuallyConsistentMapBuilder()
+                .withName("cfg")
+                .withSerializer(serializer)
+                .withClockService(new WallclockClockManager<>())
+                .build();
+
         properties.addListener(new InternalPropertiesListener());
         log.info("Started");
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 6ddeea9..4ef317f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -17,13 +17,11 @@
 package org.onosproject.store.consistent.impl;
 
 import com.google.common.collect.Sets;
-
 import net.kuujo.copycat.cluster.ClusterConfig;
 import net.kuujo.copycat.cluster.Member;
 import net.kuujo.copycat.log.FileLog;
 import net.kuujo.copycat.netty.NettyTcpProtocol;
 import net.kuujo.copycat.protocol.Consistency;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -32,8 +30,11 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.store.cluster.impl.NodeInfo;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.PartitionInfo;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageAdminService;
@@ -71,6 +72,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
     protected String nodeToUri(NodeInfo node) {
         return String.format("tcp://%s:%d", node.getIp(), COPYCAT_TCP_PORT);
     }
@@ -169,12 +173,12 @@
 
     @Override
     public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
-        return new DefaultConsistentMap<K, V>(name, partitionedDatabase, serializer);
+        return new DefaultConsistentMap<>(name, partitionedDatabase, serializer);
     }
 
     @Override
     public <K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer) {
-        return new DefaultAsyncConsistentMap<K, V>(name, partitionedDatabase, serializer);
+        return new DefaultAsyncConsistentMap<>(name, partitionedDatabase, serializer);
     }
 
     @Override
@@ -207,4 +211,12 @@
                           database.cluster().leader() != null ?
                                   database.cluster().leader().uri() : null);
     }
+
+
+    @Override
+    public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
+        return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+                                                        clusterCommunicator);
+    }
+
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
new file mode 100644
index 0000000..9d60143
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.ecmap;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Eventually consistent map builder.
+ */
+public class EventuallyConsistentMapBuilderImpl<K, V>
+        implements EventuallyConsistentMapBuilder<K, V> {
+    private final ClusterService clusterService;
+    private final ClusterCommunicationService clusterCommunicator;
+
+    private String name;
+    private KryoNamespace.Builder serializerBuilder;
+    private ExecutorService eventExecutor;
+    private ExecutorService communicationExecutor;
+    private ScheduledExecutorService backgroundExecutor;
+    private ClockService<K, V> clockService;
+    private BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
+    private boolean tombstonesDisabled = false;
+    private long antiEntropyPeriod = 5;
+    private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
+    private boolean convergeFaster = false;
+
+    /**
+     * Creates a new eventually consistent map builder.
+     *
+     * @param clusterService cluster service
+     * @param clusterCommunicator cluster communication service
+     */
+    public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
+                                              ClusterCommunicationService clusterCommunicator) {
+        this.clusterService = checkNotNull(clusterService);
+        this.clusterCommunicator = checkNotNull(clusterCommunicator);
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withName(String name) {
+        this.name = checkNotNull(name);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withSerializer(
+            KryoNamespace.Builder serializerBuilder) {
+        this.serializerBuilder = checkNotNull(serializerBuilder);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withClockService(
+            ClockService<K, V> clockService) {
+        this.clockService = checkNotNull(clockService);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withEventExecutor(ExecutorService executor) {
+        this.eventExecutor = checkNotNull(executor);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder<K, V> withCommunicationExecutor(
+            ExecutorService executor) {
+        communicationExecutor = checkNotNull(executor);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withBackgroundExecutor(ScheduledExecutorService executor) {
+        this.backgroundExecutor = checkNotNull(executor);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder withPeerUpdateFunction(
+            BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
+        this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder<K, V> withTombstonesDisabled() {
+        tombstonesDisabled = true;
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder<K, V> withAntiEntropyPeriod(long period, TimeUnit unit) {
+        checkArgument(period > 0, "anti-entropy period must be greater than 0");
+        antiEntropyPeriod = period;
+        antiEntropyTimeUnit = checkNotNull(unit);
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMapBuilder<K, V> withFasterConvergence() {
+        convergeFaster = true;
+        return this;
+    }
+
+    @Override
+    public EventuallyConsistentMap<K, V> build() {
+        checkNotNull(name, "name is a mandatory parameter");
+        checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
+        checkNotNull(clockService, "clockService is a mandatory parameter");
+
+        return new EventuallyConsistentMapImpl<>(name,
+                                                 clusterService,
+                                                 clusterCommunicator,
+                                                 serializerBuilder,
+                                                 clockService,
+                                                 peerUpdateFunction,
+                                                 eventExecutor,
+                                                 communicationExecutor,
+                                                 backgroundExecutor,
+                                                 tombstonesDisabled,
+                                                 antiEntropyPeriod,
+                                                 antiEntropyTimeUnit,
+                                                 convergeFaster);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 2987529..62d145d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -32,10 +32,13 @@
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.ClockService;
 import org.onosproject.store.impl.Timestamped;
 import org.onosproject.store.impl.WallClockTimestamp;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +57,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
@@ -93,8 +95,8 @@
     private final ScheduledExecutorService backgroundExecutor;
     private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
 
-    private ExecutorService communicationExecutor;
-    private Map<NodeId, EventAccumulator> senderPending;
+    private final ExecutorService communicationExecutor;
+    private final Map<NodeId, EventAccumulator> senderPending;
 
     private volatile boolean destroyed = false;
     private static final String ERROR_DESTROYED = " map is already destroyed";
@@ -103,130 +105,115 @@
     private static final String ERROR_NULL_KEY = "Key cannot be null";
     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
 
-    // TODO: Make these anti-entropy params configurable
-    private long initialDelaySec = 5;
-    private long periodSec = 5;
-    private boolean lightweightAntiEntropy = true;
-    private boolean tombstonesDisabled = false;
+    private final long initialDelaySec = 5;
+    private final boolean lightweightAntiEntropy;
+    private final boolean tombstonesDisabled;
 
     private static final int WINDOW_SIZE = 5;
     private static final int HIGH_LOAD_THRESHOLD = 0;
     private static final int LOAD_WINDOW = 2;
-    SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
-    AtomicLong operations = new AtomicLong();
+    private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
 
     /**
      * Creates a new eventually consistent map shared amongst multiple instances.
      * <p>
-     * Each map is identified by a string map name. EventuallyConsistentMapImpl
-     * objects in different JVMs that use the same map name will form a
-     * distributed map across JVMs (provided the cluster service is aware of
-     * both nodes).
-     * </p>
-     * <p>
-     * The client is expected to provide an
-     * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
-     * will be stored in this map have been registered (including referenced
-     * classes). This serializer will be used to serialize both K and V for
-     * inter-node notifications.
-     * </p>
-     * <p>
-     * The client must provide an {@link org.onosproject.store.impl.ClockService}
-     * which can generate timestamps for a given key. The clock service is free
-     * to generate timestamps however it wishes, however these timestamps will
-     * be used to serialize updates to the map so they must be strict enough
-     * to ensure updates are properly ordered for the use case (i.e. in some
-     * cases wallclock time will suffice, whereas in other cases logical time
-     * will be necessary).
+     * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
+     * for more description of the parameters expected by the map.
      * </p>
      *
-     * @param mapName             a String identifier for the map.
-     * @param clusterService      the cluster service
-     * @param clusterCommunicator the cluster communications service
-     * @param serializerBuilder   a Kryo namespace builder that can serialize
-     *                            both K and V
-     * @param clockService        a clock service able to generate timestamps
-     *                            for K
-     * @param peerUpdateFunction  function that provides a set of nodes to immediately
-     *                            update to when there writes to the map
+     * @param mapName               a String identifier for the map.
+     * @param clusterService        the cluster service
+     * @param clusterCommunicator   the cluster communications service
+     * @param serializerBuilder     a Kryo namespace builder that can serialize
+     *                              both K and V
+     * @param clockService          a clock service able to generate timestamps
+     *                              for K and V
+     * @param peerUpdateFunction    function that provides a set of nodes to immediately
+     *                              update to when there writes to the map
+     * @param eventExecutor         executor to use for processing incoming
+     *                              events from peers
+     * @param communicationExecutor executor to use for sending events to peers
+     * @param backgroundExecutor    executor to use for background anti-entropy
+     *                              tasks
+     * @param tombstonesDisabled    true if this map should not maintain
+     *                              tombstones
+     * @param antiEntropyPeriod     period that the anti-entropy task should run
+     *                              in seconds
+     * @param convergeFaster        make anti-entropy try to converge faster
      */
-    public EventuallyConsistentMapImpl(String mapName,
-                                       ClusterService clusterService,
-                                       ClusterCommunicationService clusterCommunicator,
-                                       KryoNamespace.Builder serializerBuilder,
-                                       ClockService<K, V> clockService,
-                                       BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
-        this.clusterService = checkNotNull(clusterService);
-        this.clusterCommunicator = checkNotNull(clusterCommunicator);
-        this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
-
-        serializer = createSerializer(checkNotNull(serializerBuilder));
-        destroyedMessage = mapName + ERROR_DESTROYED;
-
-        this.clockService = checkNotNull(clockService);
-
+    EventuallyConsistentMapImpl(String mapName,
+                                ClusterService clusterService,
+                                ClusterCommunicationService clusterCommunicator,
+                                KryoNamespace.Builder serializerBuilder,
+                                ClockService<K, V> clockService,
+                                BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
+                                ExecutorService eventExecutor,
+                                ExecutorService communicationExecutor,
+                                ScheduledExecutorService backgroundExecutor,
+                                boolean tombstonesDisabled,
+                                long antiEntropyPeriod,
+                                TimeUnit antiEntropyTimeUnit,
+                                boolean convergeFaster) {
         items = new ConcurrentHashMap<>();
         removedItems = new ConcurrentHashMap<>();
-
-        // should be a normal executor; it's used for receiving messages
-        //TODO make # of threads configurable
-        executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
-
-        // sending executor; should be capped
-        //TODO make # of threads configurable
-        //TODO this probably doesn't need to be bounded anymore
-        communicationExecutor =
-                newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
         senderPending = Maps.newConcurrentMap();
+        destroyedMessage = mapName + ERROR_DESTROYED;
 
-        backgroundExecutor =
-                newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+        this.clusterService = clusterService;
+        this.clusterCommunicator = clusterCommunicator;
+
+        this.serializer = createSerializer(serializerBuilder);
+
+        this.clockService = clockService;
+
+        if (peerUpdateFunction != null) {
+            this.peerUpdateFunction = peerUpdateFunction;
+        } else {
+            this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
+                    .map(ControllerNode::id)
+                    .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+                    .collect(Collectors.toList());
+        }
+
+        if (eventExecutor != null) {
+            this.executor = eventExecutor;
+        } else {
+            // should be a normal executor; it's used for receiving messages
+            this.executor =
+                    Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+        }
+
+        if (communicationExecutor != null) {
+            this.communicationExecutor = communicationExecutor;
+        } else {
+            // sending executor; should be capped
+            //TODO this probably doesn't need to be bounded anymore
+            this.communicationExecutor =
+                    newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+        }
+
+        if (backgroundExecutor != null) {
+            this.backgroundExecutor = backgroundExecutor;
+        } else {
+            this.backgroundExecutor =
+                    newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+        }
 
         // start anti-entropy thread
-        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
-                                               initialDelaySec, periodSec,
-                                               TimeUnit.SECONDS);
+        this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+                                                    initialDelaySec, antiEntropyPeriod,
+                                                    antiEntropyTimeUnit);
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          new InternalEventListener(), executor);
+                                          new InternalEventListener(), this.executor);
 
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
-                                          new InternalAntiEntropyListener(), backgroundExecutor);
-    }
+                                          new InternalAntiEntropyListener(), this.backgroundExecutor);
 
-    /**
-     * Creates a new eventually consistent map shared amongst multiple instances.
-     * <p>
-     * Take a look at the other constructor for usage information. The only difference
-     * is that a BiFunction is provided that returns all nodes in the cluster, so
-     * all nodes will be sent write updates immediately.
-     * </p>
-     *
-     * @param mapName             a String identifier for the map.
-     * @param clusterService      the cluster service
-     * @param clusterCommunicator the cluster communications service
-     * @param serializerBuilder   a Kryo namespace builder that can serialize
-     *                            both K and V
-     * @param clockService        a clock service able to generate timestamps
-     *                            for K
-     */
-    public EventuallyConsistentMapImpl(String mapName,
-                                       ClusterService clusterService,
-                                       ClusterCommunicationService clusterCommunicator,
-                                       KryoNamespace.Builder serializerBuilder,
-                                       ClockService<K, V> clockService) {
-        this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
-             (key, value) -> clusterService.getNodes().stream()
-                     .map(ControllerNode::id)
-                     .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
-                     .collect(Collectors.toList()));
-    }
-
-    public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
-        tombstonesDisabled = status;
-        return this;
+        this.tombstonesDisabled = tombstonesDisabled;
+        this.lightweightAntiEntropy = !convergeFaster;
     }
 
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -246,19 +233,6 @@
         };
     }
 
-    /**
-     * Sets the executor to use for broadcasting messages and returns this
-     * instance for method chaining.
-     *
-     * @param executor executor service
-     * @return this instance
-     */
-    public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
-        checkNotNull(executor, "Null executor");
-        communicationExecutor = executor;
-        return this;
-    }
-
     @Override
     public int size() {
         checkState(!destroyed, destroyedMessage);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index a571b4e..1d30607 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -15,23 +15,8 @@
  */
 package org.onosproject.store.group.impl;
 
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -75,17 +60,32 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
-import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
-import org.onosproject.store.impl.ClockService;
 import org.onosproject.store.impl.MultiValuedTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Manages inventory of group entries using trivial in-memory implementation.
@@ -108,6 +108,9 @@
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MastershipService mastershipService;
 
     // Per device group table with (device id + app cookie) as key
@@ -192,31 +195,38 @@
                           messageHandlingExecutor);
 
         log.debug("Creating EC map groupstorekeymap");
-        groupStoreEntriesByKey =
-                new EventuallyConsistentMapImpl<>("groupstorekeymap",
-                        clusterService,
-                        clusterCommunicator,
-                        kryoBuilder,
-                        new GroupStoreLogicalClockManager<>());
+        EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
+                keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+        groupStoreEntriesByKey = keyMapBuilder
+                .withName("groupstorekeymap")
+                .withSerializer(kryoBuilder)
+                .withClockService(new GroupStoreLogicalClockManager<>())
+                .build();
         log.trace("Current size {}", groupStoreEntriesByKey.size());
 
         log.debug("Creating EC map groupstoreidmap");
-        groupStoreEntriesById =
-                new EventuallyConsistentMapImpl<>("groupstoreidmap",
-                        clusterService,
-                        clusterCommunicator,
-                        kryoBuilder,
-                        new GroupStoreLogicalClockManager<>());
+        EventuallyConsistentMapBuilder<GroupStoreIdMapKey, StoredGroupEntry>
+                idMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+        groupStoreEntriesById = idMapBuilder
+                        .withName("groupstoreidmap")
+                        .withSerializer(kryoBuilder)
+                        .withClockService(new GroupStoreLogicalClockManager<>())
+                        .build();
+
         groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
         log.trace("Current size {}", groupStoreEntriesById.size());
 
         log.debug("Creating EC map pendinggroupkeymap");
-        auditPendingReqQueue =
-                new EventuallyConsistentMapImpl<>("pendinggroupkeymap",
-                clusterService,
-                clusterCommunicator,
-                kryoBuilder,
-                new GroupStoreLogicalClockManager<>());
+        EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
+                auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+        auditPendingReqQueue = auditMapBuilder
+                .withName("pendinggroupkeymap")
+                .withSerializer(kryoBuilder)
+                .withClockService(new GroupStoreLogicalClockManager<>())
+                .build();
         log.trace("Current size {}", auditPendingReqQueue.size());
 
         log.info("Started");
@@ -819,11 +829,11 @@
      * Map handler to receive any events when the group map is updated.
      */
     private class GroupStoreIdMapListener implements
-        EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
+            EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
 
         @Override
         public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
-                          StoredGroupEntry> mapEvent) {
+                                  StoredGroupEntry> mapEvent) {
             GroupEvent groupEvent = null;
             log.trace("GroupStoreIdMapListener: received groupid map event {}",
                       mapEvent.type());
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java
index ec90a5f..fd25ebf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.impl;
 
 import org.onosproject.store.Timestamp;
+import org.onosproject.store.service.ClockService;
 
 /**
  * A clock service which hands out wallclock-based timestamps.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 370b0b5..fc2e8e3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -36,14 +36,13 @@
 import org.onosproject.net.intent.Key;
 import org.onosproject.net.intent.PartitionService;
 import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
-import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
 import org.onosproject.store.impl.MultiValuedTimestamp;
 import org.onosproject.store.impl.WallClockTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import java.util.Collection;
@@ -52,7 +51,7 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.net.intent.IntentState.*;
+import static org.onosproject.net.intent.IntentState.PURGE_REQ;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -61,7 +60,7 @@
  */
 //FIXME we should listen for leadership changes. if the local instance has just
 // ...  become a leader, scan the pending map and process those
-@Component(immediate = false, enabled = true)
+@Component(immediate = true, enabled = true)
 @Service
 public class GossipIntentStore
         extends AbstractStore<IntentEvent, IntentStoreDelegate>
@@ -76,10 +75,10 @@
     private EventuallyConsistentMap<Key, IntentData> pendingMap;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
+    protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
+    protected StorageService storageService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PartitionService partitionService;
@@ -92,19 +91,19 @@
                 .register(MultiValuedTimestamp.class)
                 .register(WallClockTimestamp.class);
 
-        currentMap = new EventuallyConsistentMapImpl<>("intent-current",
-                                                       clusterService,
-                                                       clusterCommunicator,
-                                                       intentSerializer,
-                                                       new IntentDataLogicalClockManager<>(),
-                                                       (key, intentData) -> getPeerNodes(key, intentData));
+        currentMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
+                .withName("intent-current")
+                .withSerializer(intentSerializer)
+                .withClockService(new IntentDataLogicalClockManager<>())
+                .withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
+                .build();
 
-        pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
-                                                       clusterService,
-                                                       clusterCommunicator,
-                                                       intentSerializer,
-                                                       new IntentDataClockManager<>(),
-                                                       (key, intentData) -> getPeerNodes(key, intentData));
+        pendingMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
+                .withName("intent-pending")
+                .withSerializer(intentSerializer)
+                .withClockService(new IntentDataClockManager<>())
+                .withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
+                .build();
 
         currentMap.addListener(new InternalCurrentListener());
         pendingMap.addListener(new InternalPendingListener());
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java
index f64f6b7..1c4ffbf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java
@@ -17,7 +17,7 @@
 
 import org.onosproject.net.intent.IntentData;
 import org.onosproject.store.Timestamp;
-import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.service.ClockService;
 import org.onosproject.store.impl.MultiValuedTimestamp;
 
 /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java
index 4636cd7..9d21223 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java
@@ -17,7 +17,7 @@
 
 import org.onosproject.net.intent.IntentData;
 import org.onosproject.store.Timestamp;
-import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.service.ClockService;
 import org.onosproject.store.impl.MultiValuedTimestamp;
 
 import java.util.concurrent.atomic.AtomicLong;
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index f7c9323..8629fcd 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -34,10 +34,13 @@
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.service.ClockService;
 import org.onosproject.store.impl.WallClockTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -134,10 +137,13 @@
                 .register(KryoNamespaces.API)
                 .register(TestTimestamp.class);
 
-        ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
-                                                  clusterCommunicator,
-                                                  serializer, clockService)
-                                        .withBroadcastMessageExecutor(MoreExecutors.newDirectExecutorService());
+        ecMap = new EventuallyConsistentMapBuilderImpl<>(
+                        clusterService, clusterCommunicator)
+                .withName(MAP_NAME)
+                .withSerializer(serializer)
+                .withClockService(clockService)
+                .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
+                .build();
 
         // Reset ready for tests to add their own expectations
         reset(clusterCommunicator);