adding sender-side accumulator to ecmap

Change-Id: I63de27131c067c07b41ca311b14ef3ac85b6ae3e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java
new file mode 100644
index 0000000..4a87b41
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/AbstractEntry.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.ecmap;
+
+import org.onosproject.store.Timestamp;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Base class for events in an EventuallyConsistentMap.
+ */
+public abstract class AbstractEntry<K, V> implements Comparable<AbstractEntry<K, V>> {
+    private final K key;
+    private final Timestamp timestamp;
+
+    /**
+     * Creates a new put entry.
+     *
+     * @param key key of the entry
+     * @param timestamp timestamp of the put event
+     */
+    public AbstractEntry(K key, Timestamp timestamp) {
+        this.key = checkNotNull(key);
+        this.timestamp = checkNotNull(timestamp);
+    }
+
+    // Needed for serialization.
+    @SuppressWarnings("unused")
+    protected AbstractEntry() {
+        this.key = null;
+        this.timestamp = null;
+    }
+
+    /**
+     * Returns the key of the entry.
+     *
+     * @return the key
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * Returns the timestamp of the event.
+     *
+     * @return the timestamp
+     */
+    public Timestamp timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public int compareTo(AbstractEntry<K, V> o) {
+        return this.timestamp.compareTo(o.timestamp);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8f99d0e..c2c46fc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -15,9 +15,13 @@
  */
 package org.onosproject.store.ecmap;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.AbstractAccumulator;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.SlidingWindowCounter;
 import org.onosproject.cluster.ClusterService;
@@ -42,6 +46,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -78,7 +83,6 @@
     private final ClockService<K, V> clockService;
 
     private final MessageSubject updateMessageSubject;
-    private final MessageSubject removeMessageSubject;
     private final MessageSubject antiEntropyAdvertisementSubject;
 
     private final Set<EventuallyConsistentMapListener<K, V>> listeners
@@ -87,9 +91,10 @@
     private final ExecutorService executor;
 
     private final ScheduledExecutorService backgroundExecutor;
-    private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
+    private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
 
-    private ExecutorService broadcastMessageExecutor;
+    private ExecutorService communicationExecutor;
+    private Map<NodeId, EventAccumulator> senderPending;
 
     private volatile boolean destroyed = false;
     private static final String ERROR_DESTROYED = " map is already destroyed";
@@ -149,7 +154,7 @@
                                        ClusterCommunicationService clusterCommunicator,
                                        KryoNamespace.Builder serializerBuilder,
                                        ClockService<K, V> clockService,
-                                       BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
+                                       BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
         this.clusterService = checkNotNull(clusterService);
         this.clusterCommunicator = checkNotNull(clusterCommunicator);
         this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
@@ -168,27 +173,23 @@
 
         // sending executor; should be capped
         //TODO make # of threads configurable
-        broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
-                newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify"));
+        //TODO this probably doesn't need to be bounded anymore
+        communicationExecutor =
+                newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+        senderPending = Maps.newConcurrentMap();
 
         backgroundExecutor =
-                //FIXME anti-entropy can take >60 seconds and it blocks fg workers
-                // ... dropping minPriority to try to help until this can be parallel
-                newSingleThreadScheduledExecutor(//minPriority(
-                                                 groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
+                newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
 
         // start anti-entropy thread
-        //TODO disable anti-entropy for now in testing (it is unstable)
         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                                                initialDelaySec, periodSec,
                                                TimeUnit.SECONDS);
 
         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
         clusterCommunicator.addSubscriber(updateMessageSubject,
-                                          new InternalPutEventListener(), executor);
-        removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
-        clusterCommunicator.addSubscriber(removeMessageSubject,
-                                          new InternalRemoveEventListener(), executor);
+                                          new InternalEventListener(), executor);
+
         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
                                           new InternalAntiEntropyListener(), backgroundExecutor);
@@ -232,8 +233,6 @@
                         .register(PutEntry.class)
                         .register(RemoveEntry.class)
                         .register(ArrayList.class)
-                        .register(InternalPutEvent.class)
-                        .register(InternalRemoveEvent.class)
                         .register(AntiEntropyAdvertisement.class)
                         .register(HashMap.class)
                         .build();
@@ -250,7 +249,7 @@
      */
     public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
         checkNotNull(executor, "Null executor");
-        broadcastMessageExecutor = executor;
+        communicationExecutor = executor;
         return this;
     }
 
@@ -303,7 +302,7 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (putInternal(key, value, timestamp)) {
-            notifyPeers(new InternalPutEvent<>(key, value, timestamp),
+            notifyPeers(new PutEntry<>(key, value, timestamp),
                         peerUpdateFunction.apply(key, value));
             notifyListeners(new EventuallyConsistentMapEvent<>(
                     EventuallyConsistentMapEvent.Type.PUT, key, value));
@@ -350,7 +349,7 @@
         Timestamp timestamp = clockService.getTimestamp(key, null);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+            notifyPeers(new RemoveEntry<>(key, timestamp),
                         peerUpdateFunction.apply(key, null));
             notifyListeners(new EventuallyConsistentMapEvent<>(
                     EventuallyConsistentMapEvent.Type.REMOVE, key, null));
@@ -395,7 +394,7 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+            notifyPeers(new RemoveEntry<>(key, timestamp),
                         peerUpdateFunction.apply(key, value));
             notifyListeners(new EventuallyConsistentMapEvent<>(
                     EventuallyConsistentMapEvent.Type.REMOVE, key, value));
@@ -405,75 +404,24 @@
     @Override
     public void putAll(Map<? extends K, ? extends V> m) {
         checkState(!destroyed, destroyedMessage);
-
-        List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
-
-        for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
-            K key = entry.getKey();
-            V value = entry.getValue();
-
-            checkNotNull(key, ERROR_NULL_KEY);
-            checkNotNull(value, ERROR_NULL_VALUE);
-
-            Timestamp timestamp = clockService.getTimestamp(key, value);
-
-            if (putInternal(key, value, timestamp)) {
-                updates.add(new PutEntry<>(key, value, timestamp));
-            }
-        }
-
-        if (!updates.isEmpty()) {
-            broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
-
-            for (PutEntry<K, V> entry : updates) {
-                EventuallyConsistentMapEvent<K, V> externalEvent =
-                        new EventuallyConsistentMapEvent<>(
-                                EventuallyConsistentMapEvent.Type.PUT, entry.key(),
-                                entry.value());
-                notifyListeners(externalEvent);
-            }
-        }
+        m.forEach(this::put);
     }
 
     @Override
     public void clear() {
         checkState(!destroyed, destroyedMessage);
-
-        List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
-
-        for (K key : items.keySet()) {
-            // TODO also this is not applicable if value is important for timestamp?
-            Timestamp timestamp = clockService.getTimestamp(key, null);
-
-            if (removeInternal(key, timestamp)) {
-                removed.add(new RemoveEntry<>(key, timestamp));
-            }
-        }
-
-        if (!removed.isEmpty()) {
-            broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
-
-            for (RemoveEntry<K> entry : removed) {
-                EventuallyConsistentMapEvent<K, V> externalEvent
-                        = new EventuallyConsistentMapEvent<>(
-                        EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
-                        null);
-                notifyListeners(externalEvent);
-            }
-        }
+        items.forEach((key, value) -> remove(key));
     }
 
     @Override
     public Set<K> keySet() {
         checkState(!destroyed, destroyedMessage);
-
         return items.keySet();
     }
 
     @Override
     public Collection<V> values() {
         checkState(!destroyed, destroyedMessage);
-
         return items.values().stream()
                 .map(Timestamped::value)
                 .collect(Collectors.toList());
@@ -508,12 +456,11 @@
 
         executor.shutdown();
         backgroundExecutor.shutdown();
-        broadcastMessageExecutor.shutdown();
+        communicationExecutor.shutdown();
 
         listeners.clear();
 
         clusterCommunicator.removeSubscriber(updateMessageSubject);
-        clusterCommunicator.removeSubscriber(removeMessageSubject);
         clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
     }
 
@@ -523,45 +470,32 @@
         }
     }
 
-    private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
-        // FIXME extremely memory expensive when we are overrun
-//        broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
-        multicastMessage(updateMessageSubject, event, peers);
+    private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
+        queueUpdate(event, peers);
     }
 
-    private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
-        // FIXME extremely memory expensive when we are overrun
-//        broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
-        multicastMessage(removeMessageSubject, event, peers);
+    private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
+        queueUpdate(event, peers);
     }
 
-    private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
-        // FIXME can we parallelize the serialization... use the caller???
+    private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
+        if (peers == null) {
+            // we have no friends :(
+            return;
+        }
+        peers.forEach(node ->
+              senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+        );
+    }
+
+    private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
                 subject,
                 serializer.encode(event));
-        broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
-//        clusterCommunicator.broadcast(message);
-    }
-
-    private void broadcastMessage(MessageSubject subject, Object event) {
-        // FIXME can we parallelize the serialization... use the caller???
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                serializer.encode(event));
-        broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
-//        clusterCommunicator.broadcast(message);
-    }
-
-    private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                serializer.encode(event));
-//        clusterCommunicator.unicast(message, peer);
-        broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
+        return clusterCommunicator.unicast(message, peer);
+        // Note: we had this flipped before...
+//        communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
     }
 
     private boolean underHighLoad() {
@@ -606,9 +540,9 @@
 
                 AntiEntropyAdvertisement<K> ad = createAdvertisement();
 
-                // TODO check the return value?
-                unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
-                // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
+                if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
+                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
+                }
             } catch (Exception e) {
                 // Catch all exceptions to avoid scheduled task being suppressed.
                 log.error("Exception thrown while sending advertisement", e);
@@ -644,9 +578,9 @@
                     // Send the advertisement back if this peer is out-of-sync
                     final NodeId sender = ad.sender();
                     AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-                    // TODO check the return value?
-                    unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
-                    // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
+                    if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
+                        log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
+                    }
                     break;
                 }
             }
@@ -670,8 +604,6 @@
                 = new LinkedList<>();
         final NodeId sender = ad.sender();
 
-        final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
-
         for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
             K key = item.getKey();
             Timestamped<V> localValue = item.getValue();
@@ -683,9 +615,8 @@
             if (remoteTimestamp == null || localValue
                     .isNewerThan(remoteTimestamp)) {
                 // local value is more recent, push to sender
-                updatesToSend
-                        .add(new PutEntry<>(key, localValue.value(),
-                                            localValue.timestamp()));
+                queueUpdate(new PutEntry<>(key, localValue.value(),
+                                            localValue.timestamp()), ImmutableList.of(sender));
             }
 
             Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
@@ -699,14 +630,6 @@
             }
         }
 
-        // Send all updates to the peer at once
-        if (!updatesToSend.isEmpty()) {
-            // TODO check the return value?
-            unicastMessage(sender, updateMessageSubject,
-                           new InternalPutEvent<>(updatesToSend));
-            //error log: log.warn("Failed to send advertisement response", e);
-        }
-
         return externalEvents;
     }
 
@@ -720,8 +643,6 @@
     private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
         final NodeId sender = ad.sender();
 
-        final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
-
         for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
             K key = dead.getKey();
             Timestamp localDeadTimestamp = dead.getValue();
@@ -730,18 +651,9 @@
             if (remoteLiveTimestamp != null
                     && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
                 // sender has zombie, push remove
-                removesToSend
-                        .add(new RemoveEntry<>(key, localDeadTimestamp));
+                queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
             }
         }
-
-        // Send all removes to the peer at once
-        if (!removesToSend.isEmpty()) {
-            // TODO check the return value
-            unicastMessage(sender, removeMessageSubject,
-                           new InternalRemoveEvent<>(removesToSend));
-            // error log: log.warn("Failed to send advertisement response", e);
-        }
     }
 
     /**
@@ -800,25 +712,44 @@
         }
     }
 
-    private final class InternalPutEventListener implements
+    private final class InternalEventListener implements
             ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
-            log.debug("Received put event from peer: {}", message.sender());
-            InternalPutEvent<K, V> event = serializer.decode(message.payload());
+            log.debug("Received update event from peer: {}", message.sender());
+            Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
 
             try {
-                for (PutEntry<K, V> entry : event.entries()) {
-                    K key = entry.key();
-                    V value = entry.value();
-                    Timestamp timestamp = entry.timestamp();
+                // TODO clean this for loop up
+                for (AbstractEntry<K, V> entry : events) {
+                    final K key = entry.key();
+                    final V value;
+                    final Timestamp timestamp = entry.timestamp();
+                    final EventuallyConsistentMapEvent.Type type;
+                    if (entry instanceof PutEntry) {
+                        PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
+                        value = putEntry.value();
+                        type = EventuallyConsistentMapEvent.Type.PUT;
+                    } else if (entry instanceof RemoveEntry) {
+                        type = EventuallyConsistentMapEvent.Type.REMOVE;
+                        value = null;
+                    } else {
+                        throw new IllegalStateException("Unknown entry type " + entry.getClass());
+                    }
 
-                    if (putInternal(key, value, timestamp)) {
-                        EventuallyConsistentMapEvent<K, V> externalEvent =
-                                new EventuallyConsistentMapEvent<>(
-                                        EventuallyConsistentMapEvent.Type.PUT, key,
-                                        value);
-                        notifyListeners(externalEvent);
+                    boolean success;
+                    switch (type) {
+                        case PUT:
+                            success = putInternal(key, value, timestamp);
+                            break;
+                        case REMOVE:
+                            success = removeInternal(key, timestamp);
+                            break;
+                        default:
+                            success = false;
+                    }
+                    if (success) {
+                        notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
                     }
                 }
             } catch (Exception e) {
@@ -827,29 +758,35 @@
         }
     }
 
-    private final class InternalRemoveEventListener implements
-            ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.debug("Received remove event from peer: {}", message.sender());
-            InternalRemoveEvent<K> event = serializer.decode(message.payload());
-            try {
-                for (RemoveEntry<K> entry : event.entries()) {
-                    K key = entry.key();
-                    Timestamp timestamp = entry.timestamp();
+    // TODO pull this into the class if this gets pulled out...
+    private static final int DEFAULT_MAX_EVENTS = 1000;
+    private static final int DEFAULT_MAX_IDLE_MS = 10;
+    private static final int DEFAULT_MAX_BATCH_MS = 50;
+    private static final Timer TIMER = new Timer("onos-ecm-sender-events");
 
-                    if (removeInternal(key, timestamp)) {
-                        EventuallyConsistentMapEvent<K, V> externalEvent
-                        = new EventuallyConsistentMapEvent<>(
-                                EventuallyConsistentMapEvent.Type.REMOVE,
-                                key, null);
-                        notifyListeners(externalEvent);
-                    }
+    private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
+
+        private final NodeId peer;
+
+        private EventAccumulator(NodeId peer) {
+            super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
+            this.peer = peer;
+        }
+
+        @Override
+        public void processItems(List<AbstractEntry<K, V>> items) {
+            Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
+            items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
+                  oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
+                  )
+            );
+            communicationExecutor.submit(() -> {
+                try {
+                    unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
+                } catch (Exception e) {
+                    log.warn("broadcast error", e);
                 }
-            } catch (Exception e) {
-                log.warn("Exception thrown handling remove", e);
-            }
+            });
         }
     }
-
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalPutEvent.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalPutEvent.java
deleted file mode 100644
index 950d320..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalPutEvent.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import com.google.common.collect.ImmutableList;
-import org.onosproject.store.Timestamp;
-
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Internal inter-instance event used by EventuallyConsistentMap for PUT events.
- */
-final class InternalPutEvent<K, V> {
-    private final List<PutEntry<K, V>> entries;
-
-    /**
-     * Creates a put event for a single key.
-     *
-     * @param key key the event concerns
-     * @param value value of the key
-     * @param timestamp timestamp of the event
-     */
-    public InternalPutEvent(K key, V value, Timestamp timestamp) {
-        entries = ImmutableList.of(new PutEntry<>(key, value, timestamp));
-    }
-
-    /**
-     * Creates a put event for multiple keys.
-     *
-     * @param entries list of put entries to send an event for
-     */
-    public InternalPutEvent(List<PutEntry<K, V>> entries) {
-        this.entries = checkNotNull(entries);
-    }
-
-    // Needed for serialization.
-    @SuppressWarnings("unused")
-    private InternalPutEvent() {
-        entries = null;
-    }
-
-    /**
-     * Returns the list of put entries this event concerns.
-     *
-     * @return list of put entries
-     */
-    public List<PutEntry<K, V>> entries() {
-        return entries;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalRemoveEvent.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalRemoveEvent.java
deleted file mode 100644
index aa9a78e1..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/InternalRemoveEvent.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.ecmap;
-
-import com.google.common.collect.ImmutableList;
-import org.onosproject.store.Timestamp;
-
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Internal inter-instance event used by EventuallyConsistentMap for REMOVE
- * events.
- */
-final class InternalRemoveEvent<K> {
-    private final List<RemoveEntry<K>> entries;
-
-    /**
-     * Creates a remove event for a single key.
-     *
-     * @param key key the event concerns
-     * @param timestamp timestamp of the event
-     */
-    public InternalRemoveEvent(K key, Timestamp timestamp) {
-        entries = ImmutableList.of(new RemoveEntry<>(key, timestamp));
-    }
-
-    /**
-     * Creates a remove event for multiple keys.
-     *
-     * @param entries list of remove entries to send an event for
-     */
-    public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
-        this.entries = checkNotNull(entries);
-    }
-
-    // Needed for serialization.
-    @SuppressWarnings("unused")
-    private InternalRemoveEvent() {
-        entries = null;
-    }
-
-    /**
-     * Returns the list of remove entries this event concerns.
-     *
-     * @return list of remove entries
-     */
-    public List<RemoveEntry<K>> entries() {
-        return entries;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
index bb86961..ddb4ae9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PutEntry.java
@@ -23,10 +23,8 @@
 /**
  * Describes a single put event in an EventuallyConsistentMap.
  */
-final class PutEntry<K, V> {
-    private final K key;
+final class PutEntry<K, V> extends AbstractEntry<K, V> {
     private final V value;
-    private final Timestamp timestamp;
 
     /**
      * Creates a new put entry.
@@ -36,26 +34,15 @@
      * @param timestamp timestamp of the put event
      */
     public PutEntry(K key, V value, Timestamp timestamp) {
-        this.key = checkNotNull(key);
+        super(key, timestamp);
         this.value = checkNotNull(value);
-        this.timestamp = checkNotNull(timestamp);
     }
 
     // Needed for serialization.
     @SuppressWarnings("unused")
     private PutEntry() {
-        this.key = null;
+        super();
         this.value = null;
-        this.timestamp = null;
-    }
-
-    /**
-     * Returns the key of the entry.
-     *
-     * @return the key
-     */
-    public K key() {
-        return key;
     }
 
     /**
@@ -67,21 +54,12 @@
         return value;
     }
 
-    /**
-     * Returns the timestamp of the event.
-     *
-     * @return the timestamp
-     */
-    public Timestamp timestamp() {
-        return timestamp;
-    }
-
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
-                .add("key", key)
+                .add("key", key())
                 .add("value", value)
-                .add("timestamp", timestamp)
+                .add("timestamp", timestamp())
                 .toString();
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
index 7d34796..18b0986 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/RemoveEntry.java
@@ -18,15 +18,10 @@
 import com.google.common.base.MoreObjects;
 import org.onosproject.store.Timestamp;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Describes a single remove event in an EventuallyConsistentMap.
  */
-final class RemoveEntry<K> {
-    private final K key;
-    private final Timestamp timestamp;
-
+final class RemoveEntry<K, V> extends AbstractEntry<K, V> {
     /**
      * Creates a new remove entry.
      *
@@ -34,40 +29,20 @@
      * @param timestamp timestamp of the remove event
      */
     public RemoveEntry(K key, Timestamp timestamp) {
-        this.key = checkNotNull(key);
-        this.timestamp = checkNotNull(timestamp);
+        super(key, timestamp);
     }
 
     // Needed for serialization.
     @SuppressWarnings("unused")
     private RemoveEntry() {
-        this.key = null;
-        this.timestamp = null;
-    }
-
-    /**
-     * Returns the key of the entry.
-     *
-     * @return the key
-     */
-    public K key() {
-        return key;
-    }
-
-    /**
-     * Returns the timestamp of the event.
-     *
-     * @return the timestamp
-     */
-    public Timestamp timestamp() {
-        return timestamp;
+        super();
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
-                .add("key", key)
-                .add("timestamp", timestamp)
+                .add("key", key())
+                .add("timestamp", timestamp())
                 .toString();
     }
 }