adding sender-side accumulator to ecmap

Change-Id: I63de27131c067c07b41ca311b14ef3ac85b6ae3e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8f99d0e..c2c46fc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -15,9 +15,13 @@
  */
 package org.onosproject.store.ecmap;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.AbstractAccumulator;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.SlidingWindowCounter;
 import org.onosproject.cluster.ClusterService;
@@ -42,6 +46,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -78,7 +83,6 @@
     private final ClockService<K, V> clockService;
 
     private final MessageSubject updateMessageSubject;
-    private final MessageSubject removeMessageSubject;
     private final MessageSubject antiEntropyAdvertisementSubject;
 
     private final Set<EventuallyConsistentMapListener<K, V>> listeners
@@ -87,9 +91,10 @@
     private final ExecutorService executor;
 
     private final ScheduledExecutorService backgroundExecutor;
-    private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
+    private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
 
-    private ExecutorService broadcastMessageExecutor;
+    private ExecutorService communicationExecutor;
+    private Map<NodeId, EventAccumulator> senderPending;
 
     private volatile boolean destroyed = false;
     private static final String ERROR_DESTROYED = " map is already destroyed";
@@ -149,7 +154,7 @@
                                        ClusterCommunicationService clusterCommunicator,
                                        KryoNamespace.Builder serializerBuilder,
                                        ClockService<K, V> clockService,
-                                       BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
+                                       BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
         this.clusterService = checkNotNull(clusterService);
         this.clusterCommunicator = checkNotNull(clusterCommunicator);
         this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
@@ -168,27 +173,23 @@
 
         // sending executor; should be capped
         //TODO make # of threads configurable
-        broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
-                newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify"));
+        //TODO this probably doesn't need to be bounded anymore
+        communicationExecutor =
+                newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+        senderPending = Maps.newConcurrentMap();
 
         backgroundExecutor =
-                //FIXME anti-entropy can take >60 seconds and it blocks fg workers
-                // ... dropping minPriority to try to help until this can be parallel
-                newSingleThreadScheduledExecutor(//minPriority(
-                                                 groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
+                newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
 
         // start anti-entropy thread
-        //TODO disable anti-entropy for now in testing (it is unstable)
         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                                                initialDelaySec, periodSec,
                                                TimeUnit.SECONDS);
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          new InternalPutEventListener(), executor);
-        removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
-        clusterCommunicator.addSubscriber(removeMessageSubject,
-                                          new InternalRemoveEventListener(), executor);
+                                          new InternalEventListener(), executor);
+
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
                                           new InternalAntiEntropyListener(), backgroundExecutor);
@@ -232,8 +233,6 @@
                         .register(PutEntry.class)
                         .register(RemoveEntry.class)
                         .register(ArrayList.class)
-                        .register(InternalPutEvent.class)
-                        .register(InternalRemoveEvent.class)
                         .register(AntiEntropyAdvertisement.class)
                         .register(HashMap.class)
                         .build();
@@ -250,7 +249,7 @@
      */
     public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
         checkNotNull(executor, "Null executor");
-        broadcastMessageExecutor = executor;
+        communicationExecutor = executor;
         return this;
     }
 
@@ -303,7 +302,7 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (putInternal(key, value, timestamp)) {
-            notifyPeers(new InternalPutEvent<>(key, value, timestamp),
+            notifyPeers(new PutEntry<>(key, value, timestamp),
                         peerUpdateFunction.apply(key, value));
             notifyListeners(new EventuallyConsistentMapEvent<>(
                     EventuallyConsistentMapEvent.Type.PUT, key, value));
@@ -350,7 +349,7 @@
         Timestamp timestamp = clockService.getTimestamp(key, null);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+            notifyPeers(new RemoveEntry<>(key, timestamp),
                         peerUpdateFunction.apply(key, null));
             notifyListeners(new EventuallyConsistentMapEvent<>(
                     EventuallyConsistentMapEvent.Type.REMOVE, key, null));
@@ -395,7 +394,7 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+            notifyPeers(new RemoveEntry<>(key, timestamp),
                         peerUpdateFunction.apply(key, value));
             notifyListeners(new EventuallyConsistentMapEvent<>(
                     EventuallyConsistentMapEvent.Type.REMOVE, key, value));
@@ -405,75 +404,24 @@
     @Override
     public void putAll(Map<? extends K, ? extends V> m) {
         checkState(!destroyed, destroyedMessage);
-
-        List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
-
-        for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
-            K key = entry.getKey();
-            V value = entry.getValue();
-
-            checkNotNull(key, ERROR_NULL_KEY);
-            checkNotNull(value, ERROR_NULL_VALUE);
-
-            Timestamp timestamp = clockService.getTimestamp(key, value);
-
-            if (putInternal(key, value, timestamp)) {
-                updates.add(new PutEntry<>(key, value, timestamp));
-            }
-        }
-
-        if (!updates.isEmpty()) {
-            broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
-
-            for (PutEntry<K, V> entry : updates) {
-                EventuallyConsistentMapEvent<K, V> externalEvent =
-                        new EventuallyConsistentMapEvent<>(
-                                EventuallyConsistentMapEvent.Type.PUT, entry.key(),
-                                entry.value());
-                notifyListeners(externalEvent);
-            }
-        }
+        m.forEach(this::put);
     }
 
     @Override
     public void clear() {
         checkState(!destroyed, destroyedMessage);
-
-        List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
-
-        for (K key : items.keySet()) {
-            // TODO also this is not applicable if value is important for timestamp?
-            Timestamp timestamp = clockService.getTimestamp(key, null);
-
-            if (removeInternal(key, timestamp)) {
-                removed.add(new RemoveEntry<>(key, timestamp));
-            }
-        }
-
-        if (!removed.isEmpty()) {
-            broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
-
-            for (RemoveEntry<K> entry : removed) {
-                EventuallyConsistentMapEvent<K, V> externalEvent
-                        = new EventuallyConsistentMapEvent<>(
-                        EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
-                        null);
-                notifyListeners(externalEvent);
-            }
-        }
+        items.forEach((key, value) -> remove(key));
     }
 
     @Override
     public Set<K> keySet() {
         checkState(!destroyed, destroyedMessage);
-
         return items.keySet();
     }
 
     @Override
     public Collection<V> values() {
         checkState(!destroyed, destroyedMessage);
-
         return items.values().stream()
                 .map(Timestamped::value)
                 .collect(Collectors.toList());
@@ -508,12 +456,11 @@
 
         executor.shutdown();
         backgroundExecutor.shutdown();
-        broadcastMessageExecutor.shutdown();
+        communicationExecutor.shutdown();
 
         listeners.clear();
 
         clusterCommunicator.removeSubscriber(updateMessageSubject);
-        clusterCommunicator.removeSubscriber(removeMessageSubject);
         clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
     }
 
@@ -523,45 +470,32 @@
         }
     }
 
-    private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
-        // FIXME extremely memory expensive when we are overrun
-//        broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
-        multicastMessage(updateMessageSubject, event, peers);
+    private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
+        queueUpdate(event, peers);
     }
 
-    private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
-        // FIXME extremely memory expensive when we are overrun
-//        broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
-        multicastMessage(removeMessageSubject, event, peers);
+    private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
+        queueUpdate(event, peers);
     }
 
-    private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
-        // FIXME can we parallelize the serialization... use the caller???
+    private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
+        if (peers == null) {
+            // we have no friends :(
+            return;
+        }
+        peers.forEach(node ->
+              senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+        );
+    }
+
+    private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
                 subject,
                 serializer.encode(event));
-        broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
-//        clusterCommunicator.broadcast(message);
-    }
-
-    private void broadcastMessage(MessageSubject subject, Object event) {
-        // FIXME can we parallelize the serialization... use the caller???
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                serializer.encode(event));
-        broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
-//        clusterCommunicator.broadcast(message);
-    }
-
-    private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                serializer.encode(event));
-//        clusterCommunicator.unicast(message, peer);
-        broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
+        return clusterCommunicator.unicast(message, peer);
+        // Note: we had this flipped before...
+//        communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
     }
 
     private boolean underHighLoad() {
@@ -606,9 +540,9 @@
 
                 AntiEntropyAdvertisement<K> ad = createAdvertisement();
 
-                // TODO check the return value?
-                unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
-                // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
+                if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
+                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
+                }
             } catch (Exception e) {
                 // Catch all exceptions to avoid scheduled task being suppressed.
                 log.error("Exception thrown while sending advertisement", e);
@@ -644,9 +578,9 @@
                     // Send the advertisement back if this peer is out-of-sync
                     final NodeId sender = ad.sender();
                     AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-                    // TODO check the return value?
-                    unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
-                    // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
+                    if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
+                        log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
+                    }
                     break;
                 }
             }
@@ -670,8 +604,6 @@
                 = new LinkedList<>();
         final NodeId sender = ad.sender();
 
-        final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
-
         for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
             K key = item.getKey();
             Timestamped<V> localValue = item.getValue();
@@ -683,9 +615,8 @@
             if (remoteTimestamp == null || localValue
                     .isNewerThan(remoteTimestamp)) {
                 // local value is more recent, push to sender
-                updatesToSend
-                        .add(new PutEntry<>(key, localValue.value(),
-                                            localValue.timestamp()));
+                queueUpdate(new PutEntry<>(key, localValue.value(),
+                                            localValue.timestamp()), ImmutableList.of(sender));
             }
 
             Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
@@ -699,14 +630,6 @@
             }
         }
 
-        // Send all updates to the peer at once
-        if (!updatesToSend.isEmpty()) {
-            // TODO check the return value?
-            unicastMessage(sender, updateMessageSubject,
-                           new InternalPutEvent<>(updatesToSend));
-            //error log: log.warn("Failed to send advertisement response", e);
-        }
-
         return externalEvents;
     }
 
@@ -720,8 +643,6 @@
     private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
         final NodeId sender = ad.sender();
 
-        final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
-
         for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
             K key = dead.getKey();
             Timestamp localDeadTimestamp = dead.getValue();
@@ -730,18 +651,9 @@
             if (remoteLiveTimestamp != null
                     && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
                 // sender has zombie, push remove
-                removesToSend
-                        .add(new RemoveEntry<>(key, localDeadTimestamp));
+                queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
             }
         }
-
-        // Send all removes to the peer at once
-        if (!removesToSend.isEmpty()) {
-            // TODO check the return value
-            unicastMessage(sender, removeMessageSubject,
-                           new InternalRemoveEvent<>(removesToSend));
-            // error log: log.warn("Failed to send advertisement response", e);
-        }
     }
 
     /**
@@ -800,25 +712,44 @@
         }
     }
 
-    private final class InternalPutEventListener implements
+    private final class InternalEventListener implements
             ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
-            log.debug("Received put event from peer: {}", message.sender());
-            InternalPutEvent<K, V> event = serializer.decode(message.payload());
+            log.debug("Received update event from peer: {}", message.sender());
+            Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
 
             try {
-                for (PutEntry<K, V> entry : event.entries()) {
-                    K key = entry.key();
-                    V value = entry.value();
-                    Timestamp timestamp = entry.timestamp();
+                // TODO clean this for loop up
+                for (AbstractEntry<K, V> entry : events) {
+                    final K key = entry.key();
+                    final V value;
+                    final Timestamp timestamp = entry.timestamp();
+                    final EventuallyConsistentMapEvent.Type type;
+                    if (entry instanceof PutEntry) {
+                        PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
+                        value = putEntry.value();
+                        type = EventuallyConsistentMapEvent.Type.PUT;
+                    } else if (entry instanceof RemoveEntry) {
+                        type = EventuallyConsistentMapEvent.Type.REMOVE;
+                        value = null;
+                    } else {
+                        throw new IllegalStateException("Unknown entry type " + entry.getClass());
+                    }
 
-                    if (putInternal(key, value, timestamp)) {
-                        EventuallyConsistentMapEvent<K, V> externalEvent =
-                                new EventuallyConsistentMapEvent<>(
-                                        EventuallyConsistentMapEvent.Type.PUT, key,
-                                        value);
-                        notifyListeners(externalEvent);
+                    boolean success;
+                    switch (type) {
+                        case PUT:
+                            success = putInternal(key, value, timestamp);
+                            break;
+                        case REMOVE:
+                            success = removeInternal(key, timestamp);
+                            break;
+                        default:
+                            success = false;
+                    }
+                    if (success) {
+                        notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
                     }
                 }
             } catch (Exception e) {
@@ -827,29 +758,35 @@
         }
     }
 
-    private final class InternalRemoveEventListener implements
-            ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.debug("Received remove event from peer: {}", message.sender());
-            InternalRemoveEvent<K> event = serializer.decode(message.payload());
-            try {
-                for (RemoveEntry<K> entry : event.entries()) {
-                    K key = entry.key();
-                    Timestamp timestamp = entry.timestamp();
+    // TODO pull this into the class if this gets pulled out...
+    private static final int DEFAULT_MAX_EVENTS = 1000;
+    private static final int DEFAULT_MAX_IDLE_MS = 10;
+    private static final int DEFAULT_MAX_BATCH_MS = 50;
+    private static final Timer TIMER = new Timer("onos-ecm-sender-events");
 
-                    if (removeInternal(key, timestamp)) {
-                        EventuallyConsistentMapEvent<K, V> externalEvent
-                        = new EventuallyConsistentMapEvent<>(
-                                EventuallyConsistentMapEvent.Type.REMOVE,
-                                key, null);
-                        notifyListeners(externalEvent);
-                    }
+    private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
+
+        private final NodeId peer;
+
+        private EventAccumulator(NodeId peer) {
+            super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
+            this.peer = peer;
+        }
+
+        @Override
+        public void processItems(List<AbstractEntry<K, V>> items) {
+            Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
+            items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
+                  oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
+                  )
+            );
+            communicationExecutor.submit(() -> {
+                try {
+                    unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
+                } catch (Exception e) {
+                    log.warn("broadcast error", e);
                 }
-            } catch (Exception e) {
-                log.warn("Exception thrown handling remove", e);
-            }
+            });
         }
     }
-
 }