[ONOS-7081] Modify EventuallyConsistentMap to bootstrap from old nodes during upgrade

Change-Id: I0bb60bf64319e083117b6d0f6f82842640e5185f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index f92e3f5..605bf25 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -15,10 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import java.io.File;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -28,6 +24,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -55,6 +52,11 @@
 import org.onosproject.store.service.WorkQueue;
 import org.slf4j.Logger;
 
+import java.io.File;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
 import static org.onosproject.security.AppGuard.checkPermission;
 import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
 import static org.onosproject.store.primitives.impl.PartitionManager.PARTITIONS_DIR;
@@ -109,9 +111,28 @@
     @Override
     public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+        final NodeId localNodeId = clusterService.getLocalNode().id();
+
+        Supplier<List<NodeId>> peersSupplier = () -> clusterService.getNodes().stream()
+                .map(ControllerNode::id)
+                .filter(nodeId -> !nodeId.equals(localNodeId))
+                .filter(id -> clusterService.getState(id).isActive())
+                .collect(Collectors.toList());
+
+        Supplier<List<NodeId>> bootstrapPeersSupplier = () -> clusterService.getNodes()
+                .stream()
+                .map(ControllerNode::id)
+                .filter(id -> !localNodeId.equals(id))
+                .filter(id -> clusterService.getState(id).isActive())
+                .collect(Collectors.toList());
+
+        return new EventuallyConsistentMapBuilderImpl<>(
+                localNodeId,
                 clusterCommunicator,
-                persistenceService);
+                persistenceService,
+                peersSupplier,
+                bootstrapPeersSupplier
+        );
     }
 
     @Override
@@ -146,7 +167,7 @@
     @Override
     public <E> DistributedSetBuilder<E> setBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
+        return new DefaultDistributedSetBuilder<>(this::<E, Boolean>consistentMapBuilder);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index 9d6a016..df35bd6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -15,14 +15,7 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import java.util.Collection;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-
 import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.Timestamp;
@@ -30,6 +23,14 @@
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -38,9 +39,9 @@
  */
 public class EventuallyConsistentMapBuilderImpl<K, V>
         implements EventuallyConsistentMapBuilder<K, V> {
-    private final ClusterService clusterService;
     private final ClusterCommunicationService clusterCommunicator;
 
+    private NodeId localNodeId;
     private String name;
     private KryoNamespace serializer;
     private KryoNamespace.Builder serializerBuilder;
@@ -56,21 +57,29 @@
     private boolean persistent = false;
     private boolean persistentMap = false;
     private final PersistenceService persistenceService;
+    private Supplier<List<NodeId>> peersSupplier;
+    private Supplier<List<NodeId>> bootstrapPeersSupplier;
 
     /**
      * Creates a new eventually consistent map builder.
-     *
-     * @param clusterService cluster service
-     * @param clusterCommunicator cluster communication service
-     * @param persistenceService persistence service
+     * @param localNodeId               local node id
+     * @param clusterCommunicator       cluster communication service
+     * @param persistenceService        persistence service
+     * @param peersSupplier             supplier for peers
+     * @param bootstrapPeersSupplier    supplier for peers for bootstrap
      */
     public EventuallyConsistentMapBuilderImpl(
-            ClusterService clusterService,
+            NodeId localNodeId,
             ClusterCommunicationService clusterCommunicator,
-            PersistenceService persistenceService) {
+            PersistenceService persistenceService,
+            Supplier<List<NodeId>> peersSupplier,
+            Supplier<List<NodeId>> bootstrapPeersSupplier
+    ) {
+        this.localNodeId = localNodeId;
         this.persistenceService = persistenceService;
-        this.clusterService = checkNotNull(clusterService);
         this.clusterCommunicator = checkNotNull(clusterCommunicator);
+        this.peersSupplier = peersSupplier;
+        this.bootstrapPeersSupplier = bootstrapPeersSupplier;
     }
 
     @Override
@@ -160,21 +169,26 @@
             serializer = serializerBuilder.build(name);
         }
         checkNotNull(serializer, "serializer is a mandatory parameter");
+        checkNotNull(localNodeId, "local node id cannot be null");
 
-        return new EventuallyConsistentMapImpl<>(name,
-                                                 clusterService,
-                                                 clusterCommunicator,
-                                                 serializer,
-                                                 timestampProvider,
-                                                 peerUpdateFunction,
-                                                 eventExecutor,
-                                                 communicationExecutor,
-                                                 backgroundExecutor,
-                                                 tombstonesDisabled,
-                                                 antiEntropyPeriod,
-                                                 antiEntropyTimeUnit,
-                                                 convergeFaster,
-                                                 persistent,
-                                                 persistenceService);
+        return new EventuallyConsistentMapImpl<>(
+                localNodeId,
+                name,
+                clusterCommunicator,
+                serializer,
+                timestampProvider,
+                peerUpdateFunction,
+                eventExecutor,
+                communicationExecutor,
+                backgroundExecutor,
+                tombstonesDisabled,
+                antiEntropyPeriod,
+                antiEntropyTimeUnit,
+                convergeFaster,
+                persistent,
+                persistenceService,
+                peersSupplier,
+                bootstrapPeersSupplier
+        );
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 462d4ae..74e567b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -15,6 +15,33 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SlidingWindowCounter;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,36 +64,9 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.util.AbstractAccumulator;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.LogicalTimestamp;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.DistributedPrimitive;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -83,55 +83,43 @@
         implements EventuallyConsistentMap<K, V> {
 
     private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
+    private static final String ERROR_DESTROYED = " map is already destroyed";
+    private static final String ERROR_NULL_KEY = "Key cannot be null";
+    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+    private static final int WINDOW_SIZE = 5;
+    private static final int HIGH_LOAD_THRESHOLD = 2;
+    private static final int LOAD_WINDOW = 2;
 
     private final Map<K, MapValue<V>> items;
-
-    private final ClusterService clusterService;
     private final ClusterCommunicationService clusterCommunicator;
     private final Serializer serializer;
-    private final NodeId localNodeId;
     private final PersistenceService persistenceService;
-
     private final BiFunction<K, V, Timestamp> timestampProvider;
-
     private final MessageSubject bootstrapMessageSubject;
     private final MessageSubject initializeMessageSubject;
     private final MessageSubject updateMessageSubject;
     private final MessageSubject antiEntropyAdvertisementSubject;
     private final MessageSubject updateRequestSubject;
-
     private final Set<EventuallyConsistentMapListener<K, V>> listeners
             = Sets.newCopyOnWriteArraySet();
-
     private final ExecutorService executor;
     private final ScheduledExecutorService backgroundExecutor;
     private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
-
     private final ExecutorService communicationExecutor;
     private final Map<NodeId, EventAccumulator> senderPending;
-
-    private long previousTombstonePurgeTime;
     private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
-
     private final String mapName;
-
-    private volatile boolean destroyed = false;
-    private static final String ERROR_DESTROYED = " map is already destroyed";
     private final String destroyedMessage;
-
-    private static final String ERROR_NULL_KEY = "Key cannot be null";
-    private static final String ERROR_NULL_VALUE = "Null values are not allowed";
-
     private final long initialDelaySec = 5;
     private final boolean lightweightAntiEntropy;
     private final boolean tombstonesDisabled;
-
-    private static final int WINDOW_SIZE = 5;
-    private static final int HIGH_LOAD_THRESHOLD = 2;
-    private static final int LOAD_WINDOW = 2;
-    private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
-
     private final boolean persistent;
+    private final Supplier<List<NodeId>> peersSupplier;
+    private final Supplier<List<NodeId>> bootstrapPeersSupplier;
+    private final NodeId localNodeId;
+    private long previousTombstonePurgeTime;
+    private volatile boolean destroyed = false;
+    private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
 
     /**
      * Creates a new eventually consistent map shared amongst multiple instances.
@@ -140,42 +128,47 @@
      * for more description of the parameters expected by the map.
      * </p>
      *
-     * @param mapName               a String identifier for the map.
-     * @param clusterService        the cluster service
-     * @param clusterCommunicator   the cluster communications service
-     * @param ns                    a Kryo namespace that can serialize
-     *                              both K and V
-     * @param timestampProvider     provider of timestamps for K and V
-     * @param peerUpdateFunction    function that provides a set of nodes to immediately
-     *                              update to when there writes to the map
-     * @param eventExecutor         executor to use for processing incoming
-     *                              events from peers
-     * @param communicationExecutor executor to use for sending events to peers
-     * @param backgroundExecutor    executor to use for background anti-entropy
-     *                              tasks
-     * @param tombstonesDisabled    true if this map should not maintain
-     *                              tombstones
-     * @param antiEntropyPeriod     period that the anti-entropy task should run
-     * @param antiEntropyTimeUnit   time unit for anti-entropy period
-     * @param convergeFaster        make anti-entropy try to converge faster
-     * @param persistent            persist data to disk
-     * @param persistenceService    persistence service
+     * @param localNodeId            local node id
+     * @param mapName                a String identifier for the map.
+     * @param clusterCommunicator    the cluster communications service
+     * @param ns                     a Kryo namespace that can serialize both K and V
+     * @param timestampProvider      provider of timestamps for K and V
+     * @param peerUpdateFunction     function that provides a set of nodes to immediately
+     *                               update to when there writes to the map
+     * @param eventExecutor          executor to use for processing incoming events from peers
+     * @param communicationExecutor  executor to use for sending events to peers
+     * @param backgroundExecutor     executor to use for background anti-entropy tasks
+     * @param tombstonesDisabled     true if this map should not maintain tombstones
+     * @param antiEntropyPeriod      period that the anti-entropy task should run
+     * @param antiEntropyTimeUnit    time unit for anti-entropy period
+     * @param convergeFaster         make anti-entropy try to converge faster
+     * @param persistent             persist data to disk
+     * @param persistenceService     persistence service
+     * @param peersSupplier          supplier for peers
+     * @param bootstrapPeersSupplier supplier for bootstrap peers
      */
-    EventuallyConsistentMapImpl(String mapName,
-                                ClusterService clusterService,
-                                ClusterCommunicationService clusterCommunicator,
-                                KryoNamespace ns,
-                                BiFunction<K, V, Timestamp> timestampProvider,
-                                BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
-                                ExecutorService eventExecutor,
-                                ExecutorService communicationExecutor,
-                                ScheduledExecutorService backgroundExecutor,
-                                boolean tombstonesDisabled,
-                                long antiEntropyPeriod,
-                                TimeUnit antiEntropyTimeUnit,
-                                boolean convergeFaster,
-                                boolean persistent,
-                                PersistenceService persistenceService) {
+    //CHECKSTYLE:OFF
+    EventuallyConsistentMapImpl(
+            NodeId localNodeId,
+            String mapName,
+            ClusterCommunicationService clusterCommunicator,
+            KryoNamespace ns,
+            BiFunction<K, V, Timestamp> timestampProvider,
+            BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
+            ExecutorService eventExecutor,
+            ExecutorService communicationExecutor,
+            ScheduledExecutorService backgroundExecutor,
+            boolean tombstonesDisabled,
+            long antiEntropyPeriod,
+            TimeUnit antiEntropyTimeUnit,
+            boolean convergeFaster,
+            boolean persistent,
+            PersistenceService persistenceService,
+            Supplier<List<NodeId>> peersSupplier,
+            Supplier<List<NodeId>> bootstrapPeersSupplier
+    ) {
+        //CHECKSTYLE:ON
+        this.localNodeId = localNodeId;
         this.mapName = mapName;
         this.serializer = createSerializer(ns);
         this.persistenceService = persistenceService;
@@ -192,19 +185,20 @@
         senderPending = Maps.newConcurrentMap();
         destroyedMessage = mapName + ERROR_DESTROYED;
 
-        this.clusterService = clusterService;
         this.clusterCommunicator = clusterCommunicator;
-        this.localNodeId = clusterService.getLocalNode().id();
 
         this.timestampProvider = timestampProvider;
 
+        this.peersSupplier = peersSupplier;
+        this.bootstrapPeersSupplier = bootstrapPeersSupplier;
+
         if (peerUpdateFunction != null) {
-            this.peerUpdateFunction = peerUpdateFunction;
+            this.peerUpdateFunction = peerUpdateFunction.andThen(peers -> peersSupplier.get()
+                    .stream()
+                    .filter(peers::contains)
+                    .collect(Collectors.toList()));
         } else {
-            this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
-                    .map(ControllerNode::id)
-                    .filter(nodeId -> !nodeId.equals(localNodeId))
-                    .collect(Collectors.toList());
+            this.peerUpdateFunction = (key, value) -> peersSupplier.get();
         }
 
         if (eventExecutor != null) {
@@ -212,7 +206,8 @@
         } else {
             // should be a normal executor; it's used for receiving messages
             this.executor =
-                    Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d", log));
+                    Executors.newFixedThreadPool(8,
+                            groupedThreads("onos/ecm", mapName + "-fg-%d", log));
         }
 
         if (communicationExecutor != null) {
@@ -221,7 +216,8 @@
             // sending executor; should be capped
             //TODO this probably doesn't need to be bounded anymore
             this.communicationExecutor =
-                    newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d", log));
+                    newFixedThreadPool(8,
+                            groupedThreads("onos/ecm", mapName + "-publish-%d", log));
         }
 
 
@@ -229,55 +225,70 @@
             this.backgroundExecutor = backgroundExecutor;
         } else {
             this.backgroundExecutor =
-                    newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d", log));
+                    newSingleThreadScheduledExecutor(
+                            groupedThreads("onos/ecm", mapName + "-bg-%d", log));
         }
 
         // start anti-entropy thread
-        this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
-                                                    initialDelaySec, antiEntropyPeriod,
-                                                    antiEntropyTimeUnit);
+        this.backgroundExecutor.scheduleAtFixedRate(
+                this::sendAdvertisement,
+                initialDelaySec, antiEntropyPeriod,
+                antiEntropyTimeUnit
+        );
 
         bootstrapMessageSubject = new MessageSubject("ecm-" + mapName + "-bootstrap");
-        clusterCommunicator.addSubscriber(bootstrapMessageSubject,
-                                          serializer::decode,
-                                          (Function<NodeId, CompletableFuture<Void>>) this::handleBootstrap,
-                                          serializer::encode);
+        clusterCommunicator.addSubscriber(
+                bootstrapMessageSubject,
+                serializer::decode,
+                (Function<NodeId, CompletableFuture<Void>>) this::handleBootstrap,
+                serializer::encode
+        );
 
         initializeMessageSubject = new MessageSubject("ecm-" + mapName + "-initialize");
-        clusterCommunicator.addSubscriber(initializeMessageSubject,
+        clusterCommunicator.addSubscriber(
+                initializeMessageSubject,
                 serializer::decode,
                 (Function<Collection<UpdateEntry<K, V>>, Void>) u -> {
                     processUpdates(u);
                     return null;
                 },
                 serializer::encode,
-                this.executor);
+                this.executor
+        );
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
-        clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          serializer::decode,
-                                          this::processUpdates,
-                                          this.executor);
+        clusterCommunicator.addSubscriber(
+                updateMessageSubject,
+                serializer::decode,
+                this::processUpdates,
+                this.executor
+        );
 
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
-        clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
-                                          serializer::decode,
-                                          this::handleAntiEntropyAdvertisement,
-                                          serializer::encode,
-                                          this.backgroundExecutor);
+        clusterCommunicator.addSubscriber(
+                antiEntropyAdvertisementSubject,
+                serializer::decode,
+                this::handleAntiEntropyAdvertisement,
+                serializer::encode,
+                this.backgroundExecutor
+        );
 
         updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
-        clusterCommunicator.addSubscriber(updateRequestSubject,
-                                          serializer::decode,
-                                          this::handleUpdateRequests,
-                                          this.backgroundExecutor);
+        clusterCommunicator.addSubscriber(
+                updateRequestSubject,
+                serializer::decode,
+                this::handleUpdateRequests,
+                this.backgroundExecutor
+        );
 
         if (!tombstonesDisabled) {
             previousTombstonePurgeTime = 0;
-            this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
-                                                           initialDelaySec,
-                                                           antiEntropyPeriod,
-                                                           TimeUnit.SECONDS);
+            this.backgroundExecutor.scheduleWithFixedDelay(
+                    this::purgeTombstones,
+                    initialDelaySec,
+                    antiEntropyPeriod,
+                    TimeUnit.SECONDS
+            );
         }
 
         this.tombstonesDisabled = tombstonesDisabled;
@@ -289,20 +300,20 @@
 
     private Serializer createSerializer(KryoNamespace ns) {
         return Serializer.using(KryoNamespace.newBuilder()
-                         .register(ns)
-                         // not so robust way to avoid collision with other
-                         // user supplied registrations
-                         .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
-                         .register(KryoNamespaces.BASIC)
-                         .register(LogicalTimestamp.class)
-                         .register(WallClockTimestamp.class)
-                         .register(AntiEntropyAdvertisement.class)
-                         .register(AntiEntropyResponse.class)
-                         .register(UpdateEntry.class)
-                         .register(MapValue.class)
-                         .register(MapValue.Digest.class)
-                         .register(UpdateRequest.class)
-                         .build(name() + "-ecmap"));
+                .register(ns)
+                // not so robust way to avoid collision with other
+                // user supplied registrations
+                .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+                .register(KryoNamespaces.BASIC)
+                .register(LogicalTimestamp.class)
+                .register(WallClockTimestamp.class)
+                .register(AntiEntropyAdvertisement.class)
+                .register(AntiEntropyResponse.class)
+                .register(UpdateEntry.class)
+                .register(MapValue.class)
+                .register(MapValue.Digest.class)
+                .register(UpdateRequest.class)
+                .build(name() + "-ecmap"));
     }
 
     @Override
@@ -335,9 +346,9 @@
         checkState(!destroyed, destroyedMessage);
         checkNotNull(value, ERROR_NULL_VALUE);
         return items.values()
-                    .stream()
-                    .filter(MapValue::isAlive)
-                    .anyMatch(v -> value.equals(v.get()));
+                .stream()
+                .filter(MapValue::isAlive)
+                .anyMatch(v -> value.equals(v.get()));
     }
 
     @Override
@@ -384,7 +395,7 @@
         MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
         if (previousValue != null) {
             notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
-                        peerUpdateFunction.apply(key, previousValue.get()));
+                    peerUpdateFunction.apply(key, previousValue.get()));
             if (previousValue.isAlive()) {
                 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
             }
@@ -474,14 +485,14 @@
     public void clear() {
         checkState(!destroyed, destroyedMessage);
         Maps.filterValues(items, MapValue::isAlive)
-            .forEach((k, v) -> remove(k));
+                .forEach((k, v) -> remove(k));
     }
 
     @Override
     public Set<K> keySet() {
         checkState(!destroyed, destroyedMessage);
         return Maps.filterValues(items, MapValue::isAlive)
-                   .keySet();
+                .keySet();
     }
 
     @Override
@@ -494,16 +505,16 @@
     public Set<Map.Entry<K, V>> entrySet() {
         checkState(!destroyed, destroyedMessage);
         return Maps.filterValues(items, MapValue::isAlive)
-                   .entrySet()
-                   .stream()
-                   .map(e -> Pair.of(e.getKey(), e.getValue().get()))
-                   .collect(Collectors.toSet());
+                .entrySet()
+                .stream()
+                .map(e -> Pair.of(e.getKey(), e.getValue().get()))
+                .collect(Collectors.toSet());
     }
 
     /**
      * Returns true if newValue was accepted i.e. map is updated.
      *
-     * @param key key
+     * @param key      key
      * @param newValue proposed new value
      * @return true if update happened; false if map already contains a more recent value for the key
      */
@@ -575,7 +586,7 @@
             return;
         }
         peers.forEach(node ->
-                        senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+                senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
         );
     }
 
@@ -596,14 +607,9 @@
     }
 
     private Optional<NodeId> pickRandomActivePeer() {
-        List<NodeId> activePeers = clusterService.getNodes()
-                .stream()
-                .map(ControllerNode::id)
-                .filter(id -> !localNodeId.equals(id))
-                .filter(id -> clusterService.getState(id).isActive())
-                .collect(Collectors.toList());
+        List<NodeId> activePeers = peersSupplier.get();
         Collections.shuffle(activePeers);
-        return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
+        return activePeers.stream().findFirst();
     }
 
     private void sendAdvertisementToPeer(NodeId peer) {
@@ -680,14 +686,12 @@
             if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
                 // local value is more recent, push to sender
                 queueUpdate(new UpdateEntry<>(key, localValue), peers);
-            } else if (remoteValueDigest != null
-                    && remoteValueDigest.isNewerThan(localValue.digest())
-                    && remoteValueDigest.isTombstone()) {
+            } else if (remoteValueDigest.isNewerThan(localValue.digest()) && remoteValueDigest.isTombstone()) {
                 // remote value is more recent and a tombstone: update local value
                 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
                 MapValue<V> previousValue = removeInternal(key,
-                                                           Optional.empty(),
-                                                           Optional.of(tombstone));
+                        Optional.empty(),
+                        Optional.of(tombstone));
                 if (previousValue != null && previousValue.isAlive()) {
                     externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
                 }
@@ -707,9 +711,8 @@
         final Set<K> keys = request.keys();
         final NodeId sender = request.sender();
         final List<NodeId> peers = ImmutableList.of(sender);
-
         keys.forEach(key ->
-            queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
+                queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
         );
     }
 
@@ -721,21 +724,19 @@
          * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
          * as the time before which all tombstones are considered safe to purge.
          */
-        long currentSafeTombstonePurgeTime =  clusterService.getNodes()
-                                                            .stream()
-                                                            .map(ControllerNode::id)
-                                                            .filter(id -> !id.equals(localNodeId))
-                                                            .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
-                                                            .reduce(Math::min)
-                                                            .orElse(0L);
+        long currentSafeTombstonePurgeTime = peersSupplier.get()
+                .stream()
+                .map(id -> antiEntropyTimes.getOrDefault(id, 0L))
+                .reduce(Math::min)
+                .orElse(0L);
         if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
             return;
         }
         List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
-                                          .stream()
-                                          .filter(e -> e.getValue().isTombstone())
-                                          .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
-                                          .collect(Collectors.toList());
+                .stream()
+                .filter(e -> e.getValue().isTombstone())
+                .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
+                .collect(Collectors.toList());
         previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
         tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
     }
@@ -764,24 +765,15 @@
      * value.
      */
     private void bootstrap() {
-        List<NodeId> activePeers = clusterService.getNodes()
-                .stream()
-                .map(ControllerNode::id)
-                .filter(id -> !localNodeId.equals(id))
-                .filter(id -> clusterService.getState(id).isActive())
-                .collect(Collectors.toList());
-
+        List<NodeId> activePeers = bootstrapPeersSupplier.get();
         if (activePeers.isEmpty()) {
             return;
         }
-
         try {
             requestBootstrapFromPeers(activePeers)
                     .get(DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            log.debug("Failed to bootstrap ec map {}: {}", mapName, e.getCause());
-        } catch (InterruptedException | TimeoutException e) {
-            log.warn("Failed to bootstrap ec map {}: {}", mapName, e);
+        } catch (ExecutionException | InterruptedException | TimeoutException e) {
+            log.debug("Failed to bootstrap ec map {}: {}", mapName, ExceptionUtils.getStackTrace(e));
         }
     }
 
@@ -798,11 +790,8 @@
         if (peers.isEmpty()) {
             return CompletableFuture.completedFuture(null);
         }
-
         CompletableFuture<Void> future = new CompletableFuture<>();
-
         final int totalPeers = peers.size();
-
         AtomicBoolean successful = new AtomicBoolean();
         AtomicInteger totalCount = new AtomicInteger();
         AtomicReference<Throwable> lastError = new AtomicReference<>();
@@ -840,7 +829,7 @@
      * @return a future to be completed once the peer has sent bootstrap updates
      */
     private CompletableFuture<Void> requestBootstrapFromPeer(NodeId peer) {
-        log.trace("Sending bootstrap request to {}", peer);
+        log.trace("Sending bootstrap request to {} for {}", peer, mapName);
         return clusterCommunicator.<NodeId, Void>sendAndReceive(
                 localNodeId,
                 bootstrapMessageSubject,
@@ -864,7 +853,7 @@
      * @return a future to be completed once updates have been sent to the peer
      */
     private CompletableFuture<Void> handleBootstrap(NodeId peer) {
-        log.trace("Received bootstrap request from {}", peer);
+        log.trace("Received bootstrap request from {} for {}", peer, bootstrapMessageSubject);
 
         Function<List<UpdateEntry<K, V>>, CompletableFuture<Void>> sendUpdates = updates -> {
             log.trace("Initializing {} with {} entries", peer, updates.size());
@@ -887,7 +876,7 @@
             K key = entry.getKey();
             MapValue<V> value = entry.getValue();
             if (value.isAlive()) {
-                updates.add(new UpdateEntry<K, V>(key, value));
+                updates.add(new UpdateEntry<>(key, value));
                 if (updates.size() == DEFAULT_MAX_EVENTS) {
                     futures.add(sendUpdates.apply(updates));
                     updates = new ArrayList<>();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index b73dd61..62a29bf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,13 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
 import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -30,6 +23,10 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.Member;
+import org.onosproject.cluster.MembershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -65,6 +62,13 @@
 import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
 import static org.onosproject.security.AppGuard.checkPermission;
 import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -95,6 +99,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PartitionAdminService partitionAdminService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MembershipService membershipService;
+
     private final Supplier<TransactionId> transactionIdGenerator =
             () -> TransactionId.from(UUID.randomUUID().toString());
     private DistributedPrimitiveCreator federatedPrimitiveCreator;
@@ -119,9 +126,40 @@
     @Override
     public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+        final NodeId localNodeId = clusterService.getLocalNode().id();
+
+        Supplier<List<NodeId>> peersSupplier = () -> membershipService.getMembers().stream()
+                .map(Member::nodeId)
+                .filter(nodeId -> !nodeId.equals(localNodeId))
+                .filter(id -> clusterService.getState(id).isActive())
+                .collect(Collectors.toList());
+
+        Supplier<List<NodeId>> bootstrapPeersSupplier = () -> {
+            if (membershipService.getMembers().size() == 1) {
+                return clusterService.getNodes()
+                        .stream()
+                        .map(ControllerNode::id)
+                        .filter(id -> !localNodeId.equals(id))
+                        .filter(id -> clusterService.getState(id).isActive())
+                        .collect(Collectors.toList());
+            } else {
+                return membershipService.getMembers()
+                        .stream()
+                        .map(Member::nodeId)
+                        .filter(id -> !localNodeId.equals(id))
+                        .filter(id -> clusterService.getState(id).isActive())
+                        .collect(Collectors.toList());
+            }
+        };
+
+
+        return new EventuallyConsistentMapBuilderImpl<>(
+                localNodeId,
                 clusterCommunicator,
-                persistenceService);
+                persistenceService,
+                peersSupplier,
+                bootstrapPeersSupplier
+        );
     }
 
     @Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 41cf18c..f16c688 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -45,13 +45,15 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.packet.IpAddress;
 import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
@@ -66,9 +68,6 @@
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
@@ -79,7 +78,6 @@
     private EventuallyConsistentMap<String, String> ecMap;
 
     private PersistenceService persistenceService;
-    private ClusterService clusterService;
     private ClusterCommunicationService clusterCommunicator;
     private SequentialClockService<String, String> clockService;
 
@@ -106,14 +104,10 @@
     private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
     private Consumer<Collection<UpdateRequest<String>>> requestHandler;
     private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
+    private Supplier<List<NodeId>> peersHandler = ArrayList::new;
 
     @Before
     public void setUp() throws Exception {
-        clusterService = createMock(ClusterService.class);
-        expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
-        expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
-        replay(clusterService);
-
         clusterCommunicator = createMock(ClusterCommunicationService.class);
 
         persistenceService = new TestPersistenceService();
@@ -154,7 +148,12 @@
                 .register(TestTimestamp.class);
 
         ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
-                        clusterService, clusterCommunicator, persistenceService)
+                NodeId.nodeId("0"),
+                clusterCommunicator,
+                persistenceService,
+                peersHandler,
+                peersHandler
+                )
                 .withName(MAP_NAME)
                 .withSerializer(serializer)
                 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))