[ONOS-6479] Synchronously bootstrap EventuallyConsistentMap

Change-Id: I62a800ee731d1b42265b475c219d9d108adc08eb
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 7964187..41b6afd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -35,6 +35,7 @@
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
@@ -42,6 +43,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -52,13 +54,17 @@
 import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -88,6 +94,8 @@
 
     private final BiFunction<K, V, Timestamp> timestampProvider;
 
+    private final MessageSubject bootstrapMessageSubject;
+    private final MessageSubject initializeMessageSubject;
     private final MessageSubject updateMessageSubject;
     private final MessageSubject antiEntropyAdvertisementSubject;
     private final MessageSubject updateRequestSubject;
@@ -229,6 +237,22 @@
                                                     initialDelaySec, antiEntropyPeriod,
                                                     antiEntropyTimeUnit);
 
+        bootstrapMessageSubject = new MessageSubject("ecm-" + mapName + "-bootstrap");
+        clusterCommunicator.addSubscriber(bootstrapMessageSubject,
+                                          serializer::decode,
+                                          (Function<NodeId, CompletableFuture<Void>>) this::handleBootstrap,
+                                          serializer::encode);
+
+        initializeMessageSubject = new MessageSubject("ecm-" + mapName + "-initialize");
+        clusterCommunicator.addSubscriber(initializeMessageSubject,
+                serializer::decode,
+                (Function<Collection<UpdateEntry<K, V>>, Void>) u -> {
+                    processUpdates(u);
+                    return null;
+                },
+                serializer::encode,
+                this.executor);
+
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
                                           serializer::decode,
@@ -505,6 +529,11 @@
         checkState(!destroyed, destroyedMessage);
 
         listeners.add(checkNotNull(listener));
+        items.forEach((k, v) -> {
+            if (v.isAlive()) {
+                listener.event(new EventuallyConsistentMapEvent<K, V>(mapName, PUT, k, v.get()));
+            }
+        });
     }
 
     @Override
@@ -524,6 +553,8 @@
 
         listeners.clear();
 
+        clusterCommunicator.removeSubscriber(bootstrapMessageSubject);
+        clusterCommunicator.removeSubscriber(initializeMessageSubject);
         clusterCommunicator.removeSubscriber(updateMessageSubject);
         clusterCommunicator.removeSubscriber(updateRequestSubject);
         clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
@@ -727,13 +758,12 @@
         });
     }
 
+    /**
+     * Bootstraps the map to attempt to get in sync with existing instances of the same map on other nodes in the
+     * cluster. This is necessary to ensure that a read immediately after the map is created doesn't return a null
+     * value.
+     */
     private void bootstrap() {
-        /*
-         * Attempt to get in sync with the cluster when a map is created. This is to help avoid a new node
-         * writing to an ECM until it has a view of the map. Depending on how lightweight the map instance
-         * is, this will attempt to advertise to all or some of the peers.
-         */
-        int n = 0;
         List<NodeId> activePeers = clusterService.getNodes()
                 .stream()
                 .map(ControllerNode::id)
@@ -745,15 +775,130 @@
             return;
         }
 
-        if (lightweightAntiEntropy) {
-            n = activePeers.size() / 2;
-        } else {
-            n = activePeers.size();
+        try {
+            requestBootstrapFromPeers(activePeers)
+                    .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            log.debug("Failed to bootstrap ec map {}: {}", mapName, e.getCause());
+        } catch (InterruptedException | TimeoutException e) {
+            log.warn("Failed to bootstrap ec map {}: {}", mapName, e);
+        }
+    }
+
+    /**
+     * Requests all updates from each peer in the provided list of peers.
+     * <p>
+     * The returned future will be completed once at least one peer bootstraps this map or bootstrap requests to all
+     * peers fail.
+     *
+     * @param peers the list of peers from which to request updates
+     * @return a future to be completed once updates have been received from at least one peer
+     */
+    private CompletableFuture<Void> requestBootstrapFromPeers(List<NodeId> peers) {
+        if (peers.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
         }
 
-        for (int i = 0; i < n; i++) {
-            sendAdvertisementToPeer(activePeers.get(i));
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        final int totalPeers = peers.size();
+
+        AtomicBoolean successful = new AtomicBoolean();
+        AtomicInteger totalCount = new AtomicInteger();
+        AtomicReference<Throwable> lastError = new AtomicReference<>();
+
+        // Iterate through all of the peers and send a bootstrap request. On the first peer that returns
+        // a successful bootstrap response, complete the future. Otherwise, if no peers respond with any
+        // successful bootstrap response, the future will be completed with the last exception.
+        for (NodeId peer : peers) {
+            requestBootstrapFromPeer(peer).whenComplete((result, error) -> {
+                if (error == null) {
+                    if (successful.compareAndSet(false, true)) {
+                        future.complete(null);
+                    } else if (totalCount.incrementAndGet() == totalPeers) {
+                        Throwable e = lastError.get();
+                        if (e != null) {
+                            future.completeExceptionally(e);
+                        }
+                    }
+                } else {
+                    if (!successful.get() && totalCount.incrementAndGet() == totalPeers) {
+                        future.completeExceptionally(error);
+                    } else {
+                        lastError.set(error);
+                    }
+                }
+            });
         }
+        return future;
+    }
+
+    /**
+     * Requests a bootstrap from the given peer.
+     *
+     * @param peer the peer from which to request updates
+     * @return a future to be completed once the peer has sent bootstrap updates
+     */
+    private CompletableFuture<Void> requestBootstrapFromPeer(NodeId peer) {
+        log.trace("Sending bootstrap request to {}", peer);
+        return clusterCommunicator.<NodeId, Void>sendAndReceive(
+                localNodeId,
+                bootstrapMessageSubject,
+                serializer::encode,
+                serializer::decode,
+                peer)
+                .whenComplete((updates, error) -> {
+                    if (error != null) {
+                        log.debug("Bootstrap request to {} failed: {}", peer, error.getMessage());
+                    }
+                });
+    }
+
+    /**
+     * Handles a bootstrap request from a peer.
+     * <p>
+     * When handling a bootstrap request from a peer, the node sends batches of entries back to the peer and
+     * completes the bootstrap request once all batches have been received and processed.
+     *
+     * @param peer the peer that sent the bootstrap request
+     * @return a future to be completed once updates have been sent to the peer
+     */
+    private CompletableFuture<Void> handleBootstrap(NodeId peer) {
+        log.trace("Received bootstrap request from {}", peer);
+
+        Function<List<UpdateEntry<K, V>>, CompletableFuture<Void>> sendUpdates = updates -> {
+            log.trace("Initializing {} with {} entries", peer, updates.size());
+            return clusterCommunicator.<List<UpdateEntry<K, V>>, Void>sendAndReceive(
+                    ImmutableList.copyOf(updates),
+                    initializeMessageSubject,
+                    serializer::encode,
+                    serializer::decode,
+                    peer)
+                    .whenComplete((result, error) -> {
+                        if (error != null) {
+                            log.debug("Failed to initialize {}", peer, error);
+                        }
+                    });
+        };
+
+        List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        List<UpdateEntry<K, V>> updates = Lists.newArrayList();
+        for (Map.Entry<K, MapValue<V>> entry : items.entrySet()) {
+            K key = entry.getKey();
+            MapValue<V> value = entry.getValue();
+            if (value.isAlive()) {
+                updates.add(new UpdateEntry<K, V>(key, value));
+                if (updates.size() == DEFAULT_MAX_EVENTS) {
+                    futures.add(sendUpdates.apply(updates));
+                    updates = new ArrayList<>();
+                }
+            }
+        }
+
+        if (!updates.isEmpty()) {
+            futures.add(sendUpdates.apply(updates));
+        }
+        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
     }
 
     // TODO pull this into the class if this gets pulled out...